fix: fix race condition at counting executing tasks on worker node
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import { Queue } from '../queue'
4 import type { Task } from '../utility-types'
5 import { DEFAULT_TASK_NAME } from '../utils'
6 import {
7 type IWorker,
8 type IWorkerNode,
9 type WorkerInfo,
10 type WorkerType,
11 WorkerTypes,
12 type WorkerUsage
13 } from './worker'
14
15 /**
16 * Worker node.
17 *
18 * @typeParam Worker - Type of worker.
19 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
20 */
21 export class WorkerNode<Worker extends IWorker, Data = unknown>
22 implements IWorkerNode<Worker, Data> {
23 /** @inheritdoc */
24 public readonly worker: Worker
25 /** @inheritdoc */
26 public readonly info: WorkerInfo
27 /** @inheritdoc */
28 public messageChannel?: MessageChannel
29 /** @inheritdoc */
30 public usage: WorkerUsage
31 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
32 private readonly tasksQueue: Queue<Task<Data>>
33 private readonly tasksQueueBackPressureSize: number
34
35 /**
36 * Constructs a new worker node.
37 *
38 * @param worker - The worker.
39 * @param workerType - The worker type.
40 * @param poolMaxSize - The pool maximum size.
41 */
42 constructor (worker: Worker, workerType: WorkerType, poolMaxSize: number) {
43 this.worker = worker
44 this.info = this.initWorkerInfo(worker, workerType)
45 if (workerType === WorkerTypes.thread) {
46 this.messageChannel = new MessageChannel()
47 }
48 this.usage = this.initWorkerUsage()
49 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
50 this.tasksQueue = new Queue<Task<Data>>()
51 this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
52 }
53
54 /** @inheritdoc */
55 public tasksQueueSize (): number {
56 return this.tasksQueue.size
57 }
58
59 /**
60 * Tasks queue maximum size.
61 *
62 * @returns The tasks queue maximum size.
63 */
64 private tasksQueueMaxSize (): number {
65 return this.tasksQueue.maxSize
66 }
67
68 /** @inheritdoc */
69 public enqueueTask (task: Task<Data>): number {
70 return this.tasksQueue.enqueue(task)
71 }
72
73 /** @inheritdoc */
74 public dequeueTask (): Task<Data> | undefined {
75 return this.tasksQueue.dequeue()
76 }
77
78 /** @inheritdoc */
79 public clearTasksQueue (): void {
80 this.tasksQueue.clear()
81 }
82
83 /** @inheritdoc */
84 public hasBackPressure (): boolean {
85 return this.tasksQueueSize() >= this.tasksQueueBackPressureSize
86 }
87
88 /** @inheritdoc */
89 public resetUsage (): void {
90 this.usage = this.initWorkerUsage()
91 this.taskFunctionsUsage.clear()
92 }
93
94 /** @inheritdoc */
95 public closeChannel (): void {
96 if (this.messageChannel != null) {
97 this.messageChannel?.port1.unref()
98 this.messageChannel?.port2.unref()
99 this.messageChannel?.port1.close()
100 this.messageChannel?.port2.close()
101 delete this.messageChannel
102 }
103 }
104
105 /** @inheritdoc */
106 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
107 if (!Array.isArray(this.info.taskFunctions)) {
108 throw new Error(
109 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
110 )
111 }
112 if (
113 Array.isArray(this.info.taskFunctions) &&
114 this.info.taskFunctions.length < 3
115 ) {
116 throw new Error(
117 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
118 )
119 }
120 if (name === DEFAULT_TASK_NAME) {
121 name = this.info.taskFunctions[1]
122 }
123 if (!this.taskFunctionsUsage.has(name)) {
124 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
125 }
126 return this.taskFunctionsUsage.get(name)
127 }
128
129 private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
130 return {
131 id: this.getWorkerId(worker, workerType),
132 type: workerType,
133 dynamic: false,
134 ready: false
135 }
136 }
137
138 private initWorkerUsage (): WorkerUsage {
139 const getTasksQueueSize = (): number => {
140 return this.tasksQueueSize()
141 }
142 const getTasksQueueMaxSize = (): number => {
143 return this.tasksQueueMaxSize()
144 }
145 return {
146 tasks: {
147 executed: 0,
148 executing: 0,
149 get queued (): number {
150 return getTasksQueueSize()
151 },
152 get maxQueued (): number {
153 return getTasksQueueMaxSize()
154 },
155 failed: 0
156 },
157 runTime: {
158 history: new CircularArray()
159 },
160 waitTime: {
161 history: new CircularArray()
162 },
163 elu: {
164 idle: {
165 history: new CircularArray()
166 },
167 active: {
168 history: new CircularArray()
169 }
170 }
171 }
172 }
173
174 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
175 const getTaskQueueSize = (): number => {
176 let taskQueueSize = 0
177 for (const task of this.tasksQueue) {
178 if (task.name === name) {
179 ++taskQueueSize
180 }
181 }
182 return taskQueueSize
183 }
184 return {
185 tasks: {
186 executed: 0,
187 executing: 0,
188 get queued (): number {
189 return getTaskQueueSize()
190 },
191 failed: 0
192 },
193 runTime: {
194 history: new CircularArray()
195 },
196 waitTime: {
197 history: new CircularArray()
198 },
199 elu: {
200 idle: {
201 history: new CircularArray()
202 },
203 active: {
204 history: new CircularArray()
205 }
206 }
207 }
208 }
209
210 /**
211 * Gets the worker id.
212 *
213 * @param worker - The worker.
214 * @param workerType - The worker type.
215 * @returns The worker id.
216 */
217 private getWorkerId (
218 worker: Worker,
219 workerType: WorkerType
220 ): number | undefined {
221 if (workerType === WorkerTypes.thread) {
222 return worker.threadId
223 } else if (workerType === WorkerTypes.cluster) {
224 return worker.id
225 }
226 }
227 }