Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4 import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
8 getWorkerId,
9 getWorkerType,
10 sleep
11 } from '../utils'
12 import { Deque } from '../deque'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type StrategyData,
17 type WorkerInfo,
18 type WorkerNodeEventCallback,
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22 } from './worker'
23
24 /**
25 * Worker node.
26 *
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 */
30 export class WorkerNode<Worker extends IWorker, Data = unknown>
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public strategyData?: StrategyData
40 /** @inheritdoc */
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
43 public tasksQueueBackPressureSize: number
44 /** @inheritdoc */
45 public onBackPressure?: WorkerNodeEventCallback
46 /** @inheritdoc */
47 public onEmptyQueue?: WorkerNodeEventCallback
48 private readonly tasksQueue: Deque<Task<Data>>
49 private onBackPressureStarted: boolean
50 private onEmptyQueueCount: number
51 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
52
53 /**
54 * Constructs a new worker node.
55 *
56 * @param worker - The worker.
57 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
58 */
59 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
60 this.checkWorkerNodeArguments(worker, tasksQueueBackPressureSize)
61 this.worker = worker
62 this.info = this.initWorkerInfo(worker)
63 this.usage = this.initWorkerUsage()
64 if (this.info.type === WorkerTypes.thread) {
65 this.messageChannel = new MessageChannel()
66 }
67 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
68 this.tasksQueue = new Deque<Task<Data>>()
69 this.onBackPressureStarted = false
70 this.onEmptyQueueCount = 0
71 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
72 }
73
74 /** @inheritdoc */
75 public tasksQueueSize (): number {
76 return this.tasksQueue.size
77 }
78
79 /** @inheritdoc */
80 public enqueueTask (task: Task<Data>): number {
81 const tasksQueueSize = this.tasksQueue.push(task)
82 if (
83 this.onBackPressure != null &&
84 this.hasBackPressure() &&
85 !this.onBackPressureStarted
86 ) {
87 this.onBackPressureStarted = true
88 this.onBackPressure(this.info.id as number)
89 this.onBackPressureStarted = false
90 }
91 return tasksQueueSize
92 }
93
94 /** @inheritdoc */
95 public unshiftTask (task: Task<Data>): number {
96 const tasksQueueSize = this.tasksQueue.unshift(task)
97 if (
98 this.onBackPressure != null &&
99 this.hasBackPressure() &&
100 !this.onBackPressureStarted
101 ) {
102 this.onBackPressureStarted = true
103 this.onBackPressure(this.info.id as number)
104 this.onBackPressureStarted = false
105 }
106 return tasksQueueSize
107 }
108
109 /** @inheritdoc */
110 public dequeueTask (): Task<Data> | undefined {
111 const task = this.tasksQueue.shift()
112 if (
113 this.onEmptyQueue != null &&
114 this.tasksQueue.size === 0 &&
115 this.onEmptyQueueCount === 0
116 ) {
117 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
118 }
119 return task
120 }
121
122 /** @inheritdoc */
123 public popTask (): Task<Data> | undefined {
124 const task = this.tasksQueue.pop()
125 if (
126 this.onEmptyQueue != null &&
127 this.tasksQueue.size === 0 &&
128 this.onEmptyQueueCount === 0
129 ) {
130 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
131 }
132 return task
133 }
134
135 /** @inheritdoc */
136 public clearTasksQueue (): void {
137 this.tasksQueue.clear()
138 }
139
140 /** @inheritdoc */
141 public hasBackPressure (): boolean {
142 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
143 }
144
145 /** @inheritdoc */
146 public resetUsage (): void {
147 this.usage = this.initWorkerUsage()
148 this.taskFunctionsUsage.clear()
149 }
150
151 /** @inheritdoc */
152 public closeChannel (): void {
153 if (this.messageChannel != null) {
154 this.messageChannel?.port1.unref()
155 this.messageChannel?.port2.unref()
156 this.messageChannel?.port1.close()
157 this.messageChannel?.port2.close()
158 delete this.messageChannel
159 }
160 }
161
162 /** @inheritdoc */
163 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
164 if (!Array.isArray(this.info.taskFunctionNames)) {
165 throw new Error(
166 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
167 )
168 }
169 if (
170 Array.isArray(this.info.taskFunctionNames) &&
171 this.info.taskFunctionNames.length < 3
172 ) {
173 throw new Error(
174 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
175 )
176 }
177 if (name === DEFAULT_TASK_NAME) {
178 name = this.info.taskFunctionNames[1]
179 }
180 if (!this.taskFunctionsUsage.has(name)) {
181 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
182 }
183 return this.taskFunctionsUsage.get(name)
184 }
185
186 private async startOnEmptyQueue (): Promise<void> {
187 if (
188 this.onEmptyQueueCount > 0 &&
189 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
190 ) {
191 this.onEmptyQueueCount = 0
192 return
193 }
194 ++this.onEmptyQueueCount
195 this.onEmptyQueue?.(this.info.id as number)
196 await sleep(exponentialDelay(this.onEmptyQueueCount))
197 await this.startOnEmptyQueue()
198 }
199
200 private initWorkerInfo (worker: Worker): WorkerInfo {
201 return {
202 id: getWorkerId(worker),
203 type: getWorkerType(worker) as WorkerType,
204 dynamic: false,
205 ready: false
206 }
207 }
208
209 private initWorkerUsage (): WorkerUsage {
210 const getTasksQueueSize = (): number => {
211 return this.tasksQueue.size
212 }
213 const getTasksQueueMaxSize = (): number => {
214 return this.tasksQueue.maxSize
215 }
216 return {
217 tasks: {
218 executed: 0,
219 executing: 0,
220 get queued (): number {
221 return getTasksQueueSize()
222 },
223 get maxQueued (): number {
224 return getTasksQueueMaxSize()
225 },
226 stolen: 0,
227 failed: 0
228 },
229 runTime: {
230 history: new CircularArray()
231 },
232 waitTime: {
233 history: new CircularArray()
234 },
235 elu: {
236 idle: {
237 history: new CircularArray()
238 },
239 active: {
240 history: new CircularArray()
241 }
242 }
243 }
244 }
245
246 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
247 const getTaskFunctionQueueSize = (): number => {
248 let taskFunctionQueueSize = 0
249 for (const task of this.tasksQueue) {
250 if (
251 (task.name === DEFAULT_TASK_NAME &&
252 name === (this.info.taskFunctionNames as string[])[1]) ||
253 (task.name !== DEFAULT_TASK_NAME && name === task.name)
254 ) {
255 ++taskFunctionQueueSize
256 }
257 }
258 return taskFunctionQueueSize
259 }
260 return {
261 tasks: {
262 executed: 0,
263 executing: 0,
264 get queued (): number {
265 return getTaskFunctionQueueSize()
266 },
267 stolen: 0,
268 failed: 0
269 },
270 runTime: {
271 history: new CircularArray()
272 },
273 waitTime: {
274 history: new CircularArray()
275 },
276 elu: {
277 idle: {
278 history: new CircularArray()
279 },
280 active: {
281 history: new CircularArray()
282 }
283 }
284 }
285 }
286
287 private checkWorkerNodeArguments (
288 worker: Worker,
289 tasksQueueBackPressureSize: number
290 ): void {
291 if (worker == null) {
292 throw new TypeError('Cannot construct a worker node without a worker')
293 }
294 if (tasksQueueBackPressureSize == null) {
295 throw new TypeError(
296 'Cannot construct a worker node without a tasks queue back pressure size'
297 )
298 }
299 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
300 throw new TypeError(
301 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
302 )
303 }
304 if (tasksQueueBackPressureSize <= 0) {
305 throw new RangeError(
306 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
307 )
308 }
309 }
310 }