fix: ensure tasks redistribution avoid task execution starvation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
7c89e6a4 13 isAsyncFunction,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
e102732c
JB
31import type {
32 IWorker,
4b628b48 33 IWorkerNode,
8a1260a3 34 WorkerInfo,
4b628b48 35 WorkerType,
e102732c
JB
36 WorkerUsage
37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
c97c7edb
S
109 throw new Error('Cannot start a pool from a worker!')
110 }
8d3782fa 111 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 112 this.checkFilePath(this.filePath)
7c0ba920 113 this.checkPoolOptions(this.opts)
1086026a 114
7254e419
JB
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 118 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
b97d82d8 180 } else if (max === 0) {
079de991 181 throw new RangeError(
b97d82d8 182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
8d3782fa
JB
189 }
190 }
191
7c0ba920 192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 216 }
aee46736
JB
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 223 throw new Error(
aee46736 224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
225 )
226 }
7c0ba920
JB
227 }
228
0d80593b
JB
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
49be33fe
JB
237 if (
238 workerChoiceStrategyOptions.weights != null &&
6b27d407 239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
f0d7f803
JB
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
0d80593b
JB
255 }
256
a20f0ba5
JB
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
0d80593b
JB
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
f0d7f803
JB
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
a20f0ba5 275 throw new Error(
f0d7f803 276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
277 )
278 }
279 }
280
e761c033
JB
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
aa9eede8 289 this.createAndSetupWorkerNode()
e761c033
JB
290 }
291 }
292
08f3f44c 293 /** @inheritDoc */
6b27d407
JB
294 public get info (): PoolInfo {
295 return {
23ccf9d7 296 version,
6b27d407 297 type: this.type,
184855e6 298 worker: this.worker,
2431bdb4
JB
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
301 minSize: this.minSize,
302 maxSize: this.maxSize,
c05f0d50
JB
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
1305e9a8
JB
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 workerNode.usage.tasks.executing === 0
a4e07f72
JB
311 ? accumulator + 1
312 : accumulator,
6b27d407
JB
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
318 0
319 ),
a4e07f72 320 executedTasks: this.workerNodes.reduce(
6b27d407 321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
df593701 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
b25a42cd 337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 338 0
a4e07f72
JB
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 accumulator + workerNode.usage.tasks.failed,
a4e07f72 343 0
1dcf8b7b
JB
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
98e72cda
JB
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
1dcf8b7b
JB
353 )
354 ),
98e72cda
JB
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
1dcf8b7b 360 )
98e72cda
JB
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
1dcf8b7b
JB
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
98e72cda
JB
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
1dcf8b7b
JB
394 )
395 ),
98e72cda
JB
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
1dcf8b7b 401 )
98e72cda
JB
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
1dcf8b7b
JB
425 }
426 })
6b27d407
JB
427 }
428 }
08f3f44c 429
aa9eede8
JB
430 /**
431 * The pool readiness boolean status.
432 */
2431bdb4
JB
433 private get ready (): boolean {
434 return (
b97d82d8
JB
435 this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 !workerNode.info.dynamic && workerNode.info.ready
438 ? accumulator + 1
439 : accumulator,
440 0
441 ) >= this.minSize
2431bdb4
JB
442 )
443 }
444
afe0d5bf 445 /**
aa9eede8 446 * The approximate pool utilization.
afe0d5bf
JB
447 *
448 * @returns The pool utilization.
449 */
450 private get utilization (): number {
8e5ca040 451 const poolTimeCapacity =
fe7d90db 452 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
453 const totalTasksRunTime = this.workerNodes.reduce(
454 (accumulator, workerNode) =>
71514351 455 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
456 0
457 )
458 const totalTasksWaitTime = this.workerNodes.reduce(
459 (accumulator, workerNode) =>
71514351 460 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
461 0
462 )
8e5ca040 463 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
464 }
465
8881ae32 466 /**
aa9eede8 467 * The pool type.
8881ae32
JB
468 *
469 * If it is `'dynamic'`, it provides the `max` property.
470 */
471 protected abstract get type (): PoolType
472
184855e6 473 /**
aa9eede8 474 * The worker type.
184855e6
JB
475 */
476 protected abstract get worker (): WorkerType
477
c2ade475 478 /**
aa9eede8 479 * The pool minimum size.
c2ade475 480 */
6b27d407 481 protected abstract get minSize (): number
ff733df7
JB
482
483 /**
aa9eede8 484 * The pool maximum size.
ff733df7 485 */
6b27d407 486 protected abstract get maxSize (): number
a35560ba 487
6b813701
JB
488 /**
489 * Checks if the worker id sent in the received message from a worker is valid.
490 *
491 * @param message - The received message.
492 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
493 */
21f710aa
JB
494 private checkMessageWorkerId (message: MessageValue<Response>): void {
495 if (
496 message.workerId != null &&
aa9eede8 497 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
21f710aa
JB
498 ) {
499 throw new Error(
500 `Worker message received from unknown worker '${message.workerId}'`
501 )
502 }
503 }
504
ffcbbad8 505 /**
f06e48d8 506 * Gets the given worker its worker node key.
ffcbbad8
JB
507 *
508 * @param worker - The worker.
f59e1027 509 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 510 */
aa9eede8 511 private getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
512 return this.workerNodes.findIndex(
513 workerNode => workerNode.worker === worker
514 )
bf9549ae
JB
515 }
516
aa9eede8
JB
517 /**
518 * Gets the worker node key given its worker id.
519 *
520 * @param workerId - The worker id.
521 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
522 */
523 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
524 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
525 if (workerNode.info.id === workerId) {
526 return workerNodeKey
527 }
528 }
529 }
530
afc003b2 531 /** @inheritDoc */
a35560ba 532 public setWorkerChoiceStrategy (
59219cbb
JB
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 535 ): void {
aee46736 536 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 537 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
538 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
aa9eede8 544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 545 workerNode.resetUsage()
aa9eede8 546 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 547 }
a20f0ba5
JB
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
553 ): void {
0d80593b 554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
555 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
556 this.workerChoiceStrategyContext.setOptions(
557 this.opts.workerChoiceStrategyOptions
a35560ba
S
558 )
559 }
560
a20f0ba5 561 /** @inheritDoc */
8f52842f
JB
562 public enableTasksQueue (
563 enable: boolean,
564 tasksQueueOptions?: TasksQueueOptions
565 ): void {
a20f0ba5 566 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 567 this.flushTasksQueues()
a20f0ba5
JB
568 }
569 this.opts.enableTasksQueue = enable
8f52842f 570 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
571 }
572
573 /** @inheritDoc */
8f52842f 574 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 575 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
576 this.checkValidTasksQueueOptions(tasksQueueOptions)
577 this.opts.tasksQueueOptions =
578 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 579 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
580 delete this.opts.tasksQueueOptions
581 }
582 }
583
584 private buildTasksQueueOptions (
585 tasksQueueOptions: TasksQueueOptions
586 ): TasksQueueOptions {
587 return {
588 concurrency: tasksQueueOptions?.concurrency ?? 1
589 }
590 }
591
c319c66b
JB
592 /**
593 * Whether the pool is full or not.
594 *
595 * The pool filling boolean status.
596 */
dea903a8
JB
597 protected get full (): boolean {
598 return this.workerNodes.length >= this.maxSize
599 }
c2ade475 600
c319c66b
JB
601 /**
602 * Whether the pool is busy or not.
603 *
604 * The pool busyness boolean status.
605 */
606 protected abstract get busy (): boolean
7c0ba920 607
6c6afb84
JB
608 /**
609 * Whether worker nodes are executing at least one task.
610 *
611 * @returns Worker nodes busyness boolean status.
612 */
c2ade475 613 protected internalBusy (): boolean {
e0ae6100
JB
614 return (
615 this.workerNodes.findIndex(workerNode => {
f59e1027 616 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
617 }) === -1
618 )
cb70b19d
JB
619 }
620
afc003b2 621 /** @inheritDoc */
a86b6df1 622 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
623 return await new Promise<Response>((resolve, reject) => {
624 const timestamp = performance.now()
625 const workerNodeKey = this.chooseWorkerNode()
501aea93 626 const task: Task<Data> = {
52b71763
JB
627 name: name ?? DEFAULT_TASK_NAME,
628 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
629 data: data ?? ({} as Data),
630 timestamp,
631 workerId: this.getWorkerInfo(workerNodeKey).id as number,
632 id: randomUUID()
633 }
501aea93 634 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
635 resolve,
636 reject,
501aea93 637 workerNodeKey
2e81254d 638 })
52b71763
JB
639 if (
640 this.opts.enableTasksQueue === true &&
641 (this.busy ||
642 this.workerNodes[workerNodeKey].usage.tasks.executing >=
643 ((this.opts.tasksQueueOptions as TasksQueueOptions)
644 .concurrency as number))
645 ) {
501aea93 646 this.enqueueTask(workerNodeKey, task)
52b71763 647 } else {
501aea93 648 this.executeTask(workerNodeKey, task)
52b71763
JB
649 }
650 this.checkAndEmitEvents()
2e81254d 651 })
280c2a77 652 }
c97c7edb 653
afc003b2 654 /** @inheritDoc */
c97c7edb 655 public async destroy (): Promise<void> {
1fbcaa7c 656 await Promise.all(
875a7c37
JB
657 this.workerNodes.map(async (workerNode, workerNodeKey) => {
658 this.flushTasksQueue(workerNodeKey)
47aacbaa 659 // FIXME: wait for tasks to be finished
920278a2
JB
660 const workerExitPromise = new Promise<void>(resolve => {
661 workerNode.worker.on('exit', () => {
662 resolve()
663 })
664 })
aa9eede8 665 await this.destroyWorkerNode(workerNodeKey)
920278a2 666 await workerExitPromise
1fbcaa7c
JB
667 })
668 )
c97c7edb
S
669 }
670
4a6952ff 671 /**
aa9eede8 672 * Terminates the worker node given its worker node key.
4a6952ff 673 *
aa9eede8 674 * @param workerNodeKey - The worker node key.
4a6952ff 675 */
aa9eede8
JB
676 protected abstract destroyWorkerNode (
677 workerNodeKey: number
678 ): void | Promise<void>
c97c7edb 679
729c563d 680 /**
6677a3d3
JB
681 * Setup hook to execute code before worker nodes are created in the abstract constructor.
682 * Can be overridden.
afc003b2
JB
683 *
684 * @virtual
729c563d 685 */
280c2a77 686 protected setupHook (): void {
d99ba5a8 687 // Intentionally empty
280c2a77 688 }
c97c7edb 689
729c563d 690 /**
280c2a77
S
691 * Should return whether the worker is the main worker or not.
692 */
693 protected abstract isMain (): boolean
694
695 /**
2e81254d 696 * Hook executed before the worker task execution.
bf9549ae 697 * Can be overridden.
729c563d 698 *
f06e48d8 699 * @param workerNodeKey - The worker node key.
1c6fe997 700 * @param task - The task to execute.
729c563d 701 */
1c6fe997
JB
702 protected beforeTaskExecutionHook (
703 workerNodeKey: number,
704 task: Task<Data>
705 ): void {
f59e1027 706 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
707 ++workerUsage.tasks.executing
708 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 709 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
710 task.name as string
711 ) as WorkerUsage
eb8afc8a
JB
712 ++taskWorkerUsage.tasks.executing
713 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
714 }
715
c01733f1 716 /**
2e81254d 717 * Hook executed after the worker task execution.
bf9549ae 718 * Can be overridden.
c01733f1 719 *
501aea93 720 * @param workerNodeKey - The worker node key.
38e795c1 721 * @param message - The received message.
c01733f1 722 */
2e81254d 723 protected afterTaskExecutionHook (
501aea93 724 workerNodeKey: number,
2740a743 725 message: MessageValue<Response>
bf9549ae 726 ): void {
ff128cc9 727 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
728 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
729 this.updateRunTimeWorkerUsage(workerUsage, message)
730 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 731 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 732 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 733 ) as WorkerUsage
eb8afc8a
JB
734 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
735 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
736 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
737 }
738
739 private updateTaskStatisticsWorkerUsage (
740 workerUsage: WorkerUsage,
741 message: MessageValue<Response>
742 ): void {
a4e07f72
JB
743 const workerTaskStatistics = workerUsage.tasks
744 --workerTaskStatistics.executing
98e72cda
JB
745 if (message.taskError == null) {
746 ++workerTaskStatistics.executed
747 } else {
a4e07f72 748 ++workerTaskStatistics.failed
2740a743 749 }
f8eb0a2a
JB
750 }
751
a4e07f72
JB
752 private updateRunTimeWorkerUsage (
753 workerUsage: WorkerUsage,
f8eb0a2a
JB
754 message: MessageValue<Response>
755 ): void {
e4f20deb
JB
756 updateMeasurementStatistics(
757 workerUsage.runTime,
758 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
759 message.taskPerformance?.runTime ?? 0,
760 workerUsage.tasks.executed
761 )
f8eb0a2a
JB
762 }
763
a4e07f72
JB
764 private updateWaitTimeWorkerUsage (
765 workerUsage: WorkerUsage,
1c6fe997 766 task: Task<Data>
f8eb0a2a 767 ): void {
1c6fe997
JB
768 const timestamp = performance.now()
769 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
770 updateMeasurementStatistics(
771 workerUsage.waitTime,
772 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
773 taskWaitTime,
774 workerUsage.tasks.executed
775 )
c01733f1 776 }
777
a4e07f72 778 private updateEluWorkerUsage (
5df69fab 779 workerUsage: WorkerUsage,
62c15a68
JB
780 message: MessageValue<Response>
781 ): void {
008512c7
JB
782 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
783 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
784 updateMeasurementStatistics(
785 workerUsage.elu.active,
008512c7 786 eluTaskStatisticsRequirements,
e4f20deb
JB
787 message.taskPerformance?.elu?.active ?? 0,
788 workerUsage.tasks.executed
789 )
790 updateMeasurementStatistics(
791 workerUsage.elu.idle,
008512c7 792 eluTaskStatisticsRequirements,
e4f20deb
JB
793 message.taskPerformance?.elu?.idle ?? 0,
794 workerUsage.tasks.executed
795 )
008512c7 796 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 797 if (message.taskPerformance?.elu != null) {
f7510105
JB
798 if (workerUsage.elu.utilization != null) {
799 workerUsage.elu.utilization =
800 (workerUsage.elu.utilization +
801 message.taskPerformance.elu.utilization) /
802 2
803 } else {
804 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
805 }
62c15a68
JB
806 }
807 }
808 }
809
280c2a77 810 /**
f06e48d8 811 * Chooses a worker node for the next task.
280c2a77 812 *
6c6afb84 813 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 814 *
aa9eede8 815 * @returns The chosen worker node key
280c2a77 816 */
6c6afb84 817 private chooseWorkerNode (): number {
930dcf12 818 if (this.shallCreateDynamicWorker()) {
aa9eede8 819 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
820 if (
821 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
822 ) {
aa9eede8 823 return workerNodeKey
6c6afb84 824 }
17393ac8 825 }
930dcf12
JB
826 return this.workerChoiceStrategyContext.execute()
827 }
828
6c6afb84
JB
829 /**
830 * Conditions for dynamic worker creation.
831 *
832 * @returns Whether to create a dynamic worker or not.
833 */
834 private shallCreateDynamicWorker (): boolean {
930dcf12 835 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
836 }
837
280c2a77 838 /**
aa9eede8 839 * Sends a message to worker given its worker node key.
280c2a77 840 *
aa9eede8 841 * @param workerNodeKey - The worker node key.
38e795c1 842 * @param message - The message.
280c2a77
S
843 */
844 protected abstract sendToWorker (
aa9eede8 845 workerNodeKey: number,
280c2a77
S
846 message: MessageValue<Data>
847 ): void
848
729c563d 849 /**
41344292 850 * Creates a new worker.
6c6afb84
JB
851 *
852 * @returns Newly created worker.
729c563d 853 */
280c2a77 854 protected abstract createWorker (): Worker
c97c7edb 855
4a6952ff 856 /**
aa9eede8 857 * Creates a new, completely set up worker node.
4a6952ff 858 *
aa9eede8 859 * @returns New, completely set up worker node key.
4a6952ff 860 */
aa9eede8 861 protected createAndSetupWorkerNode (): number {
bdacc2d2 862 const worker = this.createWorker()
280c2a77 863
35cf1c03 864 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 865 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 866 worker.on('error', error => {
9b106837
JB
867 const workerNodeKey = this.getWorkerNodeKey(worker)
868 const workerInfo = this.getWorkerInfo(workerNodeKey)
869 workerInfo.ready = false
0dc838e3 870 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 871 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 872 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 873 if (workerInfo.dynamic) {
aa9eede8 874 this.createAndSetupDynamicWorkerNode()
8a1260a3 875 } else {
aa9eede8 876 this.createAndSetupWorkerNode()
8a1260a3 877 }
5baee0d7 878 }
19dbc45b 879 if (this.opts.enableTasksQueue === true) {
9b106837 880 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 881 }
5baee0d7 882 })
a35560ba
S
883 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
884 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 885 worker.once('exit', () => {
f06e48d8 886 this.removeWorkerNode(worker)
a974afa6 887 })
280c2a77 888
aa9eede8 889 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 890
aa9eede8 891 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 892
aa9eede8 893 return workerNodeKey
c97c7edb 894 }
be0676b3 895
930dcf12 896 /**
aa9eede8 897 * Creates a new, completely set up dynamic worker node.
930dcf12 898 *
aa9eede8 899 * @returns New, completely set up dynamic worker node key.
930dcf12 900 */
aa9eede8
JB
901 protected createAndSetupDynamicWorkerNode (): number {
902 const workerNodeKey = this.createAndSetupWorkerNode()
903 this.registerWorkerMessageListener(workerNodeKey, message => {
904 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
905 message.workerId
906 ) as number
907 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
930dcf12
JB
908 if (
909 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
910 (message.kill != null &&
911 ((this.opts.enableTasksQueue === false &&
aa9eede8 912 workerUsage.tasks.executing === 0) ||
7b56f532 913 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
914 workerUsage.tasks.executing === 0 &&
915 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12
JB
916 ) {
917 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
7c89e6a4
JB
918 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
919 if (isAsyncFunction(destroyWorkerNodeBounded)) {
920 (
921 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
922 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
923 } else {
924 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
925 localWorkerNodeKey
926 )
927 }
930dcf12
JB
928 }
929 })
aa9eede8 930 const workerInfo = this.getWorkerInfo(workerNodeKey)
b0a4db63 931 workerInfo.dynamic = true
b97d82d8
JB
932 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
933 workerInfo.ready = true
934 }
aa9eede8 935 this.sendToWorker(workerNodeKey, {
b0a4db63 936 checkActive: true,
21f710aa
JB
937 workerId: workerInfo.id as number
938 })
aa9eede8 939 return workerNodeKey
930dcf12
JB
940 }
941
a2ed5053 942 /**
aa9eede8 943 * Registers a listener callback on the worker given its worker node key.
a2ed5053 944 *
aa9eede8 945 * @param workerNodeKey - The worker node key.
a2ed5053
JB
946 * @param listener - The message listener callback.
947 */
85aeb3f3
JB
948 protected abstract registerWorkerMessageListener<
949 Message extends Data | Response
aa9eede8
JB
950 >(
951 workerNodeKey: number,
952 listener: (message: MessageValue<Message>) => void
953 ): void
a2ed5053
JB
954
955 /**
aa9eede8 956 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
957 * Can be overridden.
958 *
aa9eede8 959 * @param workerNodeKey - The newly created worker node key.
a2ed5053 960 */
aa9eede8 961 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 962 // Listen to worker messages.
aa9eede8 963 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 964 // Send the startup message to worker.
aa9eede8
JB
965 this.sendStartupMessageToWorker(workerNodeKey)
966 // Send the worker statistics message to worker.
967 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
968 }
969
85aeb3f3 970 /**
aa9eede8
JB
971 * Sends the startup message to worker given its worker node key.
972 *
973 * @param workerNodeKey - The worker node key.
974 */
975 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
976
977 /**
978 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 979 *
aa9eede8 980 * @param workerNodeKey - The worker node key.
85aeb3f3 981 */
aa9eede8
JB
982 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
983 this.sendToWorker(workerNodeKey, {
984 statistics: {
985 runTime:
986 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
987 .runTime.aggregate,
988 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
989 .elu.aggregate
990 },
991 workerId: this.getWorkerInfo(workerNodeKey).id as number
992 })
993 }
a2ed5053
JB
994
995 private redistributeQueuedTasks (workerNodeKey: number): void {
996 while (this.tasksQueueSize(workerNodeKey) > 0) {
997 let targetWorkerNodeKey: number = workerNodeKey
998 let minQueuedTasks = Infinity
10ecf8fd 999 let executeTask = false
a2ed5053
JB
1000 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1001 const workerInfo = this.getWorkerInfo(workerNodeId)
1002 if (
1003 workerNodeId !== workerNodeKey &&
1004 workerInfo.ready &&
1005 workerNode.usage.tasks.queued === 0
1006 ) {
10ecf8fd
JB
1007 if (workerNode.usage.tasks.executing === 0) {
1008 executeTask = true
1009 }
a2ed5053
JB
1010 targetWorkerNodeKey = workerNodeId
1011 break
1012 }
1013 if (
1014 workerNodeId !== workerNodeKey &&
1015 workerInfo.ready &&
1016 workerNode.usage.tasks.queued < minQueuedTasks
1017 ) {
1018 minQueuedTasks = workerNode.usage.tasks.queued
1019 targetWorkerNodeKey = workerNodeId
1020 }
1021 }
10ecf8fd
JB
1022 if (executeTask) {
1023 this.executeTask(
1024 targetWorkerNodeKey,
1025 this.dequeueTask(workerNodeKey) as Task<Data>
1026 )
1027 } else {
1028 this.enqueueTask(
1029 targetWorkerNodeKey,
1030 this.dequeueTask(workerNodeKey) as Task<Data>
1031 )
1032 }
a2ed5053
JB
1033 }
1034 }
1035
be0676b3 1036 /**
aa9eede8 1037 * This method is the listener registered for each worker message.
be0676b3 1038 *
bdacc2d2 1039 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1040 */
1041 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1042 return message => {
21f710aa 1043 this.checkMessageWorkerId(message)
d2c73f82 1044 if (message.ready != null) {
10e2aa7e
JB
1045 // Worker ready response received
1046 this.handleWorkerReadyResponse(message)
f59e1027 1047 } else if (message.id != null) {
a3445496 1048 // Task execution response received
6b272951
JB
1049 this.handleTaskExecutionResponse(message)
1050 }
1051 }
1052 }
1053
10e2aa7e 1054 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8
JB
1055 this.getWorkerInfo(
1056 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
e221309a 1057 ).ready = message.ready as boolean
2431bdb4
JB
1058 if (this.emitter != null && this.ready) {
1059 this.emitter.emit(PoolEvents.ready, this.info)
1060 }
6b272951
JB
1061 }
1062
1063 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1064 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1065 if (promiseResponse != null) {
1066 if (message.taskError != null) {
2a69b8c5 1067 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1068 promiseResponse.reject(message.taskError.message)
1069 } else {
1070 promiseResponse.resolve(message.data as Response)
1071 }
501aea93
JB
1072 const workerNodeKey = promiseResponse.workerNodeKey
1073 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1074 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1075 if (
1076 this.opts.enableTasksQueue === true &&
1077 this.tasksQueueSize(workerNodeKey) > 0
1078 ) {
1079 this.executeTask(
1080 workerNodeKey,
1081 this.dequeueTask(workerNodeKey) as Task<Data>
1082 )
be0676b3 1083 }
6b272951 1084 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1085 }
be0676b3 1086 }
7c0ba920 1087
ff733df7 1088 private checkAndEmitEvents (): void {
1f68cede 1089 if (this.emitter != null) {
ff733df7 1090 if (this.busy) {
2845f2a5 1091 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1092 }
6b27d407 1093 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1094 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1095 }
164d950a
JB
1096 }
1097 }
1098
8a1260a3 1099 /**
aa9eede8 1100 * Gets the worker information given its worker node key.
8a1260a3
JB
1101 *
1102 * @param workerNodeKey - The worker node key.
3f09ed9f 1103 * @returns The worker information.
8a1260a3 1104 */
aa9eede8 1105 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1106 return this.workerNodes[workerNodeKey].info
e221309a
JB
1107 }
1108
a05c10de 1109 /**
b0a4db63 1110 * Adds the given worker in the pool worker nodes.
ea7a90d3 1111 *
38e795c1 1112 * @param worker - The worker.
aa9eede8
JB
1113 * @returns The added worker node key.
1114 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1115 */
b0a4db63 1116 private addWorkerNode (worker: Worker): number {
cc3ab78b 1117 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1118 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1119 if (this.starting) {
1120 workerNode.info.ready = true
1121 }
aa9eede8
JB
1122 this.workerNodes.push(workerNode)
1123 const workerNodeKey = this.getWorkerNodeKey(worker)
1124 if (workerNodeKey === -1) {
1125 throw new Error('Worker node not found')
1126 }
1127 return workerNodeKey
ea7a90d3 1128 }
c923ce56 1129
51fe3d3c 1130 /**
f06e48d8 1131 * Removes the given worker from the pool worker nodes.
51fe3d3c 1132 *
f06e48d8 1133 * @param worker - The worker.
51fe3d3c 1134 */
416fd65c 1135 private removeWorkerNode (worker: Worker): void {
f06e48d8 1136 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1137 if (workerNodeKey !== -1) {
1138 this.workerNodes.splice(workerNodeKey, 1)
1139 this.workerChoiceStrategyContext.remove(workerNodeKey)
1140 }
51fe3d3c 1141 }
adc3c320 1142
b0a4db63 1143 /**
aa9eede8 1144 * Executes the given task on the worker given its worker node key.
b0a4db63 1145 *
aa9eede8 1146 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1147 * @param task - The task to execute.
1148 */
2e81254d 1149 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1150 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1151 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1152 }
1153
f9f00b5f 1154 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1155 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1156 }
1157
416fd65c 1158 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1159 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1160 }
1161
416fd65c 1162 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1163 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1164 }
1165
416fd65c 1166 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1167 while (this.tasksQueueSize(workerNodeKey) > 0) {
1168 this.executeTask(
1169 workerNodeKey,
1170 this.dequeueTask(workerNodeKey) as Task<Data>
1171 )
ff733df7 1172 }
4b628b48 1173 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1174 }
1175
ef41a6e6
JB
1176 private flushTasksQueues (): void {
1177 for (const [workerNodeKey] of this.workerNodes.entries()) {
1178 this.flushTasksQueue(workerNodeKey)
1179 }
1180 }
c97c7edb 1181}