fix: fix tasks usage accounting initializer
[poolifier.git] / src / pools / worker-node.ts
1 import { CircularArray } from '../circular-array'
2 import { Queue } from '../queue'
3 import {
4 type IWorker,
5 type IWorkerNode,
6 type Task,
7 type WorkerInfo,
8 type WorkerType,
9 WorkerTypes,
10 type WorkerUsage
11 } from './worker'
12
13 /**
14 * Worker node.
15 *
16 * @typeParam Worker - Type of worker.
17 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
18 */
19 export class WorkerNode<Worker extends IWorker, Data = unknown>
20 implements IWorkerNode<Worker, Data> {
21 public readonly worker: Worker
22 public readonly info: WorkerInfo
23 public usage: WorkerUsage
24 private readonly tasksUsage: Map<string, WorkerUsage>
25 private readonly tasksQueue: Queue<Task<Data>>
26
27 /**
28 * Constructs a new worker node.
29 *
30 * @param worker - The worker.
31 * @param workerType - The worker type.
32 */
33 constructor (worker: Worker, workerType: WorkerType) {
34 this.worker = worker
35 this.info = this.initWorkerInfo(worker, workerType)
36 this.usage = this.initWorkerUsage()
37 this.tasksUsage = new Map<string, WorkerUsage>()
38 this.tasksQueue = new Queue<Task<Data>>()
39 }
40
41 /** @inheritdoc */
42 public tasksQueueSize (): number {
43 return this.tasksQueue.size
44 }
45
46 /**
47 * Worker node tasks queue maximum size.
48 *
49 * @returns The tasks queue maximum size.
50 */
51 private tasksQueueMaxSize (): number {
52 return this.tasksQueue.maxSize
53 }
54
55 /** @inheritdoc */
56 public enqueueTask (task: Task<Data>): number {
57 return this.tasksQueue.enqueue(task)
58 }
59
60 /** @inheritdoc */
61 public dequeueTask (): Task<Data> | undefined {
62 return this.tasksQueue.dequeue()
63 }
64
65 /** @inheritdoc */
66 public clearTasksQueue (): void {
67 this.tasksQueue.clear()
68 }
69
70 /** @inheritdoc */
71 public resetUsage (): void {
72 this.usage = this.initWorkerUsage()
73 this.tasksUsage.clear()
74 }
75
76 /** @inheritdoc */
77 public getTasksWorkerUsage (name: string): WorkerUsage | undefined {
78 if (!this.tasksUsage.has(name)) {
79 this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
80 }
81 return this.tasksUsage.get(name)
82 }
83
84 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
85 return {
86 id: this.getWorkerId(worker, workerType),
87 type: workerType,
88 dynamic: false,
89 ready: false
90 }
91 }
92
93 private initWorkerUsage (): WorkerUsage {
94 const getTasksQueueSize = (): number => {
95 return this.tasksQueueSize()
96 }
97 const getTasksQueueMaxSize = (): number => {
98 return this.tasksQueueMaxSize()
99 }
100 return {
101 tasks: {
102 executed: 0,
103 executing: 0,
104 get queued (): number {
105 return getTasksQueueSize()
106 },
107 get maxQueued (): number {
108 return getTasksQueueMaxSize()
109 },
110 failed: 0
111 },
112 runTime: {
113 history: new CircularArray()
114 },
115 waitTime: {
116 history: new CircularArray()
117 },
118 elu: {
119 idle: {
120 history: new CircularArray()
121 },
122 active: {
123 history: new CircularArray()
124 }
125 }
126 }
127 }
128
129 private initTaskWorkerUsage (name: string): WorkerUsage {
130 const getTaskQueueSize = (): number => {
131 let taskQueueSize = 0
132 for (const task of this.tasksQueue) {
133 if (task.name === name) {
134 ++taskQueueSize
135 }
136 }
137 return taskQueueSize
138 }
139 return {
140 tasks: {
141 executed: 0,
142 executing: 0,
143 get queued (): number {
144 return getTaskQueueSize()
145 },
146 failed: 0
147 },
148 runTime: {
149 history: new CircularArray()
150 },
151 waitTime: {
152 history: new CircularArray()
153 },
154 elu: {
155 idle: {
156 history: new CircularArray()
157 },
158 active: {
159 history: new CircularArray()
160 }
161 }
162 }
163 }
164
165 /**
166 * Gets the worker id.
167 *
168 * @param worker - The worker.
169 * @param workerType - The worker type.
170 * @returns The worker id.
171 */
172 private getWorkerId (
173 worker: Worker,
174 workerType: WorkerType
175 ): number | undefined {
176 if (workerType === WorkerTypes.thread) {
177 return worker.threadId
178 } else if (workerType === WorkerTypes.cluster) {
179 return worker.id
180 }
181 }
182 }