52829c28163637f97947343b88aab983ab1a101f
[poolifier.git] / src / pools / abstract-pool.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import { randomUUID } from 'node:crypto'
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { performance } from 'node:perf_hooks'
5 import type { TransferListItem } from 'node:worker_threads'
6
7 import type {
8 MessageValue,
9 PromiseResponseWrapper,
10 Task,
11 TaskFunctionProperties
12 } from '../utility-types.js'
13 import {
14 average,
15 buildTaskFunctionProperties,
16 DEFAULT_TASK_NAME,
17 EMPTY_FUNCTION,
18 exponentialDelay,
19 isKillBehavior,
20 isPlainObject,
21 max,
22 median,
23 min,
24 round,
25 sleep
26 } from '../utils.js'
27 import type {
28 TaskFunction,
29 TaskFunctionObject
30 } from '../worker/task-functions.js'
31 import { KillBehaviors } from '../worker/worker-options.js'
32 import {
33 type IPool,
34 PoolEvents,
35 type PoolInfo,
36 type PoolOptions,
37 type PoolType,
38 PoolTypes,
39 type TasksQueueOptions
40 } from './pool.js'
41 import {
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types.js'
47 import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
48 import {
49 checkFilePath,
50 checkValidPriority,
51 checkValidTasksQueueOptions,
52 checkValidWorkerChoiceStrategy,
53 getDefaultTasksQueueOptions,
54 updateEluWorkerUsage,
55 updateRunTimeWorkerUsage,
56 updateTaskStatisticsWorkerUsage,
57 updateWaitTimeWorkerUsage,
58 waitWorkerNodeEvents
59 } from './utils.js'
60 import { version } from './version.js'
61 import type {
62 IWorker,
63 IWorkerNode,
64 WorkerInfo,
65 WorkerNodeEventDetail,
66 WorkerType
67 } from './worker.js'
68 import { WorkerNode } from './worker-node.js'
69
70 /**
71 * Base class that implements some shared logic for all poolifier pools.
72 *
73 * @typeParam Worker - Type of worker which manages this pool.
74 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
75 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
76 */
77 export abstract class AbstractPool<
78 Worker extends IWorker,
79 Data = unknown,
80 Response = unknown
81 > implements IPool<Worker, Data, Response> {
82 /** @inheritDoc */
83 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
84
85 /** @inheritDoc */
86 public emitter?: EventEmitterAsyncResource
87
88 /**
89 * The task execution response promise map:
90 * - `key`: The message id of each submitted task.
91 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
92 *
93 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
94 */
95 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
96 new Map<string, PromiseResponseWrapper<Response>>()
97
98 /**
99 * Worker choice strategies context referencing worker choice algorithms implementation.
100 */
101 protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
102 Worker,
103 Data,
104 Response
105 >
106
107 /**
108 * The task functions added at runtime map:
109 * - `key`: The task function name.
110 * - `value`: The task function object.
111 */
112 private readonly taskFunctions: Map<
113 string,
114 TaskFunctionObject<Data, Response>
115 >
116
117 /**
118 * Whether the pool is started or not.
119 */
120 private started: boolean
121 /**
122 * Whether the pool is starting or not.
123 */
124 private starting: boolean
125 /**
126 * Whether the pool is destroying or not.
127 */
128 private destroying: boolean
129 /**
130 * Whether the minimum number of workers is starting or not.
131 */
132 private startingMinimumNumberOfWorkers: boolean
133 /**
134 * Whether the pool ready event has been emitted or not.
135 */
136 private readyEventEmitted: boolean
137 /**
138 * The start timestamp of the pool.
139 */
140 private readonly startTimestamp
141
142 /**
143 * Constructs a new poolifier pool.
144 *
145 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
146 * @param filePath - Path to the worker file.
147 * @param opts - Options for the pool.
148 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
149 */
150 public constructor (
151 protected readonly minimumNumberOfWorkers: number,
152 protected readonly filePath: string,
153 protected readonly opts: PoolOptions<Worker>,
154 protected readonly maximumNumberOfWorkers?: number
155 ) {
156 if (!this.isMain()) {
157 throw new Error(
158 'Cannot start a pool from a worker with the same type as the pool'
159 )
160 }
161 this.checkPoolType()
162 checkFilePath(this.filePath)
163 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
164 this.checkPoolOptions(this.opts)
165
166 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
167 this.executeTask = this.executeTask.bind(this)
168 this.enqueueTask = this.enqueueTask.bind(this)
169
170 if (this.opts.enableEvents === true) {
171 this.initializeEventEmitter()
172 }
173 this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
174 Worker,
175 Data,
176 Response
177 >(
178 this,
179 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
180 [this.opts.workerChoiceStrategy!],
181 this.opts.workerChoiceStrategyOptions
182 )
183
184 this.setupHook()
185
186 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
187
188 this.started = false
189 this.starting = false
190 this.destroying = false
191 this.readyEventEmitted = false
192 this.startingMinimumNumberOfWorkers = false
193 if (this.opts.startWorkers === true) {
194 this.start()
195 }
196
197 this.startTimestamp = performance.now()
198 }
199
200 private checkPoolType (): void {
201 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
202 throw new Error(
203 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
204 )
205 }
206 }
207
208 private checkMinimumNumberOfWorkers (
209 minimumNumberOfWorkers: number | undefined
210 ): void {
211 if (minimumNumberOfWorkers == null) {
212 throw new Error(
213 'Cannot instantiate a pool without specifying the number of workers'
214 )
215 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
216 throw new TypeError(
217 'Cannot instantiate a pool with a non safe integer number of workers'
218 )
219 } else if (minimumNumberOfWorkers < 0) {
220 throw new RangeError(
221 'Cannot instantiate a pool with a negative number of workers'
222 )
223 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
224 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
225 }
226 }
227
228 private checkPoolOptions (opts: PoolOptions<Worker>): void {
229 if (isPlainObject(opts)) {
230 this.opts.startWorkers = opts.startWorkers ?? true
231 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
232 this.opts.workerChoiceStrategy =
233 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
234 this.checkValidWorkerChoiceStrategyOptions(
235 opts.workerChoiceStrategyOptions
236 )
237 if (opts.workerChoiceStrategyOptions != null) {
238 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
239 }
240 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
241 this.opts.enableEvents = opts.enableEvents ?? true
242 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
243 if (this.opts.enableTasksQueue) {
244 checkValidTasksQueueOptions(opts.tasksQueueOptions)
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
251 }
252 }
253
254 private checkValidWorkerChoiceStrategyOptions (
255 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
256 ): void {
257 if (
258 workerChoiceStrategyOptions != null &&
259 !isPlainObject(workerChoiceStrategyOptions)
260 ) {
261 throw new TypeError(
262 'Invalid worker choice strategy options: must be a plain object'
263 )
264 }
265 if (
266 workerChoiceStrategyOptions?.weights != null &&
267 Object.keys(workerChoiceStrategyOptions.weights).length !==
268 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
269 ) {
270 throw new Error(
271 'Invalid worker choice strategy options: must have a weight for each worker node'
272 )
273 }
274 if (
275 workerChoiceStrategyOptions?.measurement != null &&
276 !Object.values(Measurements).includes(
277 workerChoiceStrategyOptions.measurement
278 )
279 ) {
280 throw new Error(
281 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
282 )
283 }
284 }
285
286 private initializeEventEmitter (): void {
287 this.emitter = new EventEmitterAsyncResource({
288 name: `poolifier:${this.type}-${this.worker}-pool`
289 })
290 }
291
292 /** @inheritDoc */
293 public get info (): PoolInfo {
294 return {
295 version,
296 type: this.type,
297 worker: this.worker,
298 started: this.started,
299 ready: this.ready,
300 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
301 defaultStrategy: this.opts.workerChoiceStrategy!,
302 strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
303 minSize: this.minimumNumberOfWorkers,
304 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
305 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
306 .runTime.aggregate === true &&
307 this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
308 .waitTime.aggregate && {
309 utilization: round(this.utilization)
310 }),
311 workerNodes: this.workerNodes.length,
312 idleWorkerNodes: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 workerNode.usage.tasks.executing === 0
315 ? accumulator + 1
316 : accumulator,
317 0
318 ),
319 ...(this.opts.enableTasksQueue === true && {
320 stealingWorkerNodes: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 workerNode.info.stealing ? accumulator + 1 : accumulator,
323 0
324 )
325 }),
326 busyWorkerNodes: this.workerNodes.reduce(
327 (accumulator, _workerNode, workerNodeKey) =>
328 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
329 0
330 ),
331 executedTasks: this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + workerNode.usage.tasks.executed,
334 0
335 ),
336 executingTasks: this.workerNodes.reduce(
337 (accumulator, workerNode) =>
338 accumulator + workerNode.usage.tasks.executing,
339 0
340 ),
341 ...(this.opts.enableTasksQueue === true && {
342 queuedTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + workerNode.usage.tasks.queued,
345 0
346 )
347 }),
348 ...(this.opts.enableTasksQueue === true && {
349 maxQueuedTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
352 0
353 )
354 }),
355 ...(this.opts.enableTasksQueue === true && {
356 backPressure: this.hasBackPressure()
357 }),
358 ...(this.opts.enableTasksQueue === true && {
359 stolenTasks: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 accumulator + workerNode.usage.tasks.stolen,
362 0
363 )
364 }),
365 failedTasks: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 accumulator + workerNode.usage.tasks.failed,
368 0
369 ),
370 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
371 .runTime.aggregate === true && {
372 runTime: {
373 minimum: round(
374 min(
375 ...this.workerNodes.map(
376 workerNode => workerNode.usage.runTime.minimum ?? Infinity
377 )
378 )
379 ),
380 maximum: round(
381 max(
382 ...this.workerNodes.map(
383 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
384 )
385 )
386 ),
387 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
388 .runTime.average && {
389 average: round(
390 average(
391 this.workerNodes.reduce<number[]>(
392 (accumulator, workerNode) =>
393 accumulator.concat(workerNode.usage.runTime.history),
394 []
395 )
396 )
397 )
398 }),
399 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
400 .runTime.median && {
401 median: round(
402 median(
403 this.workerNodes.reduce<number[]>(
404 (accumulator, workerNode) =>
405 accumulator.concat(workerNode.usage.runTime.history),
406 []
407 )
408 )
409 )
410 })
411 }
412 }),
413 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
414 .waitTime.aggregate === true && {
415 waitTime: {
416 minimum: round(
417 min(
418 ...this.workerNodes.map(
419 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
420 )
421 )
422 ),
423 maximum: round(
424 max(
425 ...this.workerNodes.map(
426 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
427 )
428 )
429 ),
430 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
431 .waitTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.waitTime.history),
437 []
438 )
439 )
440 )
441 }),
442 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
443 .waitTime.median && {
444 median: round(
445 median(
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.waitTime.history),
449 []
450 )
451 )
452 )
453 })
454 }
455 })
456 }
457 }
458
459 /**
460 * The pool readiness boolean status.
461 */
462 private get ready (): boolean {
463 if (this.empty) {
464 return false
465 }
466 return (
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 !workerNode.info.dynamic && workerNode.info.ready
470 ? accumulator + 1
471 : accumulator,
472 0
473 ) >= this.minimumNumberOfWorkers
474 )
475 }
476
477 /**
478 * The pool emptiness boolean status.
479 */
480 protected get empty (): boolean {
481 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
482 }
483
484 /**
485 * The approximate pool utilization.
486 *
487 * @returns The pool utilization.
488 */
489 private get utilization (): number {
490 const poolTimeCapacity =
491 (performance.now() - this.startTimestamp) *
492 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
493 const totalTasksRunTime = this.workerNodes.reduce(
494 (accumulator, workerNode) =>
495 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
496 0
497 )
498 const totalTasksWaitTime = this.workerNodes.reduce(
499 (accumulator, workerNode) =>
500 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
501 0
502 )
503 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
504 }
505
506 /**
507 * The pool type.
508 *
509 * If it is `'dynamic'`, it provides the `max` property.
510 */
511 protected abstract get type (): PoolType
512
513 /**
514 * The worker type.
515 */
516 protected abstract get worker (): WorkerType
517
518 /**
519 * Checks if the worker id sent in the received message from a worker is valid.
520 *
521 * @param message - The received message.
522 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
523 */
524 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
525 if (message.workerId == null) {
526 throw new Error('Worker message received without worker id')
527 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
528 throw new Error(
529 `Worker message received from unknown worker '${message.workerId}'`
530 )
531 }
532 }
533
534 /**
535 * Gets the worker node key given its worker id.
536 *
537 * @param workerId - The worker id.
538 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
539 */
540 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
541 return this.workerNodes.findIndex(
542 workerNode => workerNode.info.id === workerId
543 )
544 }
545
546 /** @inheritDoc */
547 public setWorkerChoiceStrategy (
548 workerChoiceStrategy: WorkerChoiceStrategy,
549 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
550 ): void {
551 let requireSync = false
552 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
553 if (workerChoiceStrategyOptions != null) {
554 requireSync = this.setWorkerChoiceStrategyOptions(
555 workerChoiceStrategyOptions
556 )
557 }
558 if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
559 this.opts.workerChoiceStrategy = workerChoiceStrategy
560 this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
561 this.opts.workerChoiceStrategy,
562 this.opts.workerChoiceStrategyOptions
563 )
564 requireSync = true
565 }
566 if (requireSync) {
567 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
568 this.getWorkerWorkerChoiceStrategies(),
569 this.opts.workerChoiceStrategyOptions
570 )
571 for (const workerNodeKey of this.workerNodes.keys()) {
572 this.sendStatisticsMessageToWorker(workerNodeKey)
573 }
574 }
575 }
576
577 /** @inheritDoc */
578 public setWorkerChoiceStrategyOptions (
579 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
580 ): boolean {
581 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
582 if (workerChoiceStrategyOptions != null) {
583 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
584 this.workerChoiceStrategiesContext?.setOptions(
585 this.opts.workerChoiceStrategyOptions
586 )
587 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
588 this.getWorkerWorkerChoiceStrategies(),
589 this.opts.workerChoiceStrategyOptions
590 )
591 for (const workerNodeKey of this.workerNodes.keys()) {
592 this.sendStatisticsMessageToWorker(workerNodeKey)
593 }
594 return true
595 }
596 return false
597 }
598
599 /** @inheritDoc */
600 public enableTasksQueue (
601 enable: boolean,
602 tasksQueueOptions?: TasksQueueOptions
603 ): void {
604 if (this.opts.enableTasksQueue === true && !enable) {
605 this.unsetTaskStealing()
606 this.unsetTasksStealingOnBackPressure()
607 this.flushTasksQueues()
608 }
609 this.opts.enableTasksQueue = enable
610 this.setTasksQueueOptions(tasksQueueOptions)
611 }
612
613 /** @inheritDoc */
614 public setTasksQueueOptions (
615 tasksQueueOptions: TasksQueueOptions | undefined
616 ): void {
617 if (this.opts.enableTasksQueue === true) {
618 checkValidTasksQueueOptions(tasksQueueOptions)
619 this.opts.tasksQueueOptions =
620 this.buildTasksQueueOptions(tasksQueueOptions)
621 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
622 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
623 if (this.opts.tasksQueueOptions.taskStealing === true) {
624 this.unsetTaskStealing()
625 this.setTaskStealing()
626 } else {
627 this.unsetTaskStealing()
628 }
629 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
630 this.unsetTasksStealingOnBackPressure()
631 this.setTasksStealingOnBackPressure()
632 } else {
633 this.unsetTasksStealingOnBackPressure()
634 }
635 } else if (this.opts.tasksQueueOptions != null) {
636 delete this.opts.tasksQueueOptions
637 }
638 }
639
640 private buildTasksQueueOptions (
641 tasksQueueOptions: TasksQueueOptions | undefined
642 ): TasksQueueOptions {
643 return {
644 ...getDefaultTasksQueueOptions(
645 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
646 ),
647 ...tasksQueueOptions
648 }
649 }
650
651 private setTasksQueueSize (size: number): void {
652 for (const workerNode of this.workerNodes) {
653 workerNode.tasksQueueBackPressureSize = size
654 }
655 }
656
657 private setTaskStealing (): void {
658 for (const workerNodeKey of this.workerNodes.keys()) {
659 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
660 }
661 }
662
663 private unsetTaskStealing (): void {
664 for (const workerNodeKey of this.workerNodes.keys()) {
665 this.workerNodes[workerNodeKey].off(
666 'idle',
667 this.handleWorkerNodeIdleEvent
668 )
669 }
670 }
671
672 private setTasksStealingOnBackPressure (): void {
673 for (const workerNodeKey of this.workerNodes.keys()) {
674 this.workerNodes[workerNodeKey].on(
675 'backPressure',
676 this.handleWorkerNodeBackPressureEvent
677 )
678 }
679 }
680
681 private unsetTasksStealingOnBackPressure (): void {
682 for (const workerNodeKey of this.workerNodes.keys()) {
683 this.workerNodes[workerNodeKey].off(
684 'backPressure',
685 this.handleWorkerNodeBackPressureEvent
686 )
687 }
688 }
689
690 /**
691 * Whether the pool is full or not.
692 *
693 * The pool filling boolean status.
694 */
695 protected get full (): boolean {
696 return (
697 this.workerNodes.length >=
698 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
699 )
700 }
701
702 /**
703 * Whether the pool is busy or not.
704 *
705 * The pool busyness boolean status.
706 */
707 protected abstract get busy (): boolean
708
709 /**
710 * Whether worker nodes are executing concurrently their tasks quota or not.
711 *
712 * @returns Worker nodes busyness boolean status.
713 */
714 protected internalBusy (): boolean {
715 if (this.opts.enableTasksQueue === true) {
716 return (
717 this.workerNodes.findIndex(
718 workerNode =>
719 workerNode.info.ready &&
720 workerNode.usage.tasks.executing <
721 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
722 this.opts.tasksQueueOptions!.concurrency!
723 ) === -1
724 )
725 }
726 return (
727 this.workerNodes.findIndex(
728 workerNode =>
729 workerNode.info.ready && workerNode.usage.tasks.executing === 0
730 ) === -1
731 )
732 }
733
734 private isWorkerNodeBusy (workerNodeKey: number): boolean {
735 if (this.opts.enableTasksQueue === true) {
736 return (
737 this.workerNodes[workerNodeKey].usage.tasks.executing >=
738 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
739 this.opts.tasksQueueOptions!.concurrency!
740 )
741 }
742 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
743 }
744
745 private async sendTaskFunctionOperationToWorker (
746 workerNodeKey: number,
747 message: MessageValue<Data>
748 ): Promise<boolean> {
749 return await new Promise<boolean>((resolve, reject) => {
750 const taskFunctionOperationListener = (
751 message: MessageValue<Response>
752 ): void => {
753 this.checkMessageWorkerId(message)
754 const workerId = this.getWorkerInfo(workerNodeKey)?.id
755 if (
756 message.taskFunctionOperationStatus != null &&
757 message.workerId === workerId
758 ) {
759 if (message.taskFunctionOperationStatus) {
760 resolve(true)
761 } else {
762 reject(
763 new Error(
764 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
765 )
766 )
767 }
768 this.deregisterWorkerMessageListener(
769 this.getWorkerNodeKeyByWorkerId(message.workerId),
770 taskFunctionOperationListener
771 )
772 }
773 }
774 this.registerWorkerMessageListener(
775 workerNodeKey,
776 taskFunctionOperationListener
777 )
778 this.sendToWorker(workerNodeKey, message)
779 })
780 }
781
782 private async sendTaskFunctionOperationToWorkers (
783 message: MessageValue<Data>
784 ): Promise<boolean> {
785 return await new Promise<boolean>((resolve, reject) => {
786 const responsesReceived = new Array<MessageValue<Response>>()
787 const taskFunctionOperationsListener = (
788 message: MessageValue<Response>
789 ): void => {
790 this.checkMessageWorkerId(message)
791 if (message.taskFunctionOperationStatus != null) {
792 responsesReceived.push(message)
793 if (responsesReceived.length === this.workerNodes.length) {
794 if (
795 responsesReceived.every(
796 message => message.taskFunctionOperationStatus === true
797 )
798 ) {
799 resolve(true)
800 } else if (
801 responsesReceived.some(
802 message => message.taskFunctionOperationStatus === false
803 )
804 ) {
805 const errorResponse = responsesReceived.find(
806 response => response.taskFunctionOperationStatus === false
807 )
808 reject(
809 new Error(
810 `Task function operation '${
811 message.taskFunctionOperation as string
812 }' failed on worker ${errorResponse?.workerId} with error: '${
813 errorResponse?.workerError?.message
814 }'`
815 )
816 )
817 }
818 this.deregisterWorkerMessageListener(
819 this.getWorkerNodeKeyByWorkerId(message.workerId),
820 taskFunctionOperationsListener
821 )
822 }
823 }
824 }
825 for (const workerNodeKey of this.workerNodes.keys()) {
826 this.registerWorkerMessageListener(
827 workerNodeKey,
828 taskFunctionOperationsListener
829 )
830 this.sendToWorker(workerNodeKey, message)
831 }
832 })
833 }
834
835 /** @inheritDoc */
836 public hasTaskFunction (name: string): boolean {
837 return this.listTaskFunctionsProperties().some(
838 taskFunctionProperties => taskFunctionProperties.name === name
839 )
840 }
841
842 /** @inheritDoc */
843 public async addTaskFunction (
844 name: string,
845 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
846 ): Promise<boolean> {
847 if (typeof name !== 'string') {
848 throw new TypeError('name argument must be a string')
849 }
850 if (typeof name === 'string' && name.trim().length === 0) {
851 throw new TypeError('name argument must not be an empty string')
852 }
853 if (typeof fn === 'function') {
854 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
855 }
856 if (typeof fn.taskFunction !== 'function') {
857 throw new TypeError('taskFunction property must be a function')
858 }
859 checkValidPriority(fn.priority)
860 checkValidWorkerChoiceStrategy(fn.strategy)
861 const opResult = await this.sendTaskFunctionOperationToWorkers({
862 taskFunctionOperation: 'add',
863 taskFunctionProperties: buildTaskFunctionProperties(name, fn),
864 taskFunction: fn.taskFunction.toString()
865 })
866 this.taskFunctions.set(name, fn)
867 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
868 this.getWorkerWorkerChoiceStrategies()
869 )
870 return opResult
871 }
872
873 /** @inheritDoc */
874 public async removeTaskFunction (name: string): Promise<boolean> {
875 if (!this.taskFunctions.has(name)) {
876 throw new Error(
877 'Cannot remove a task function not handled on the pool side'
878 )
879 }
880 const opResult = await this.sendTaskFunctionOperationToWorkers({
881 taskFunctionOperation: 'remove',
882 taskFunctionProperties: buildTaskFunctionProperties(
883 name,
884 this.taskFunctions.get(name)
885 )
886 })
887 this.deleteTaskFunctionWorkerUsages(name)
888 this.taskFunctions.delete(name)
889 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
890 this.getWorkerWorkerChoiceStrategies()
891 )
892 return opResult
893 }
894
895 /** @inheritDoc */
896 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
897 for (const workerNode of this.workerNodes) {
898 if (
899 Array.isArray(workerNode.info.taskFunctionsProperties) &&
900 workerNode.info.taskFunctionsProperties.length > 0
901 ) {
902 return workerNode.info.taskFunctionsProperties
903 }
904 }
905 return []
906 }
907
908 /**
909 * Gets task function strategy, if any.
910 *
911 * @param name - The task function name.
912 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
913 */
914 private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
915 name?: string
916 ): WorkerChoiceStrategy | undefined => {
917 if (name != null) {
918 return this.listTaskFunctionsProperties().find(
919 (taskFunctionProperties: TaskFunctionProperties) =>
920 taskFunctionProperties.name === name
921 )?.strategy
922 }
923 }
924
925 /**
926 * Gets worker node task function priority, if any.
927 *
928 * @param workerNodeKey - The worker node key.
929 * @param name - The task function name.
930 * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
931 */
932 private readonly getWorkerNodeTaskFunctionPriority = (
933 workerNodeKey: number,
934 name?: string
935 ): number | undefined => {
936 if (name != null) {
937 return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
938 (taskFunctionProperties: TaskFunctionProperties) =>
939 taskFunctionProperties.name === name
940 )?.priority
941 }
942 }
943
944 /**
945 * Gets the worker choice strategies registered in this pool.
946 *
947 * @returns The worker choice strategies.
948 */
949 private readonly getWorkerWorkerChoiceStrategies =
950 (): Set<WorkerChoiceStrategy> => {
951 return new Set([
952 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
953 this.opts.workerChoiceStrategy!,
954 ...(this.listTaskFunctionsProperties()
955 .map(
956 (taskFunctionProperties: TaskFunctionProperties) =>
957 taskFunctionProperties.strategy
958 )
959 .filter(
960 (strategy: WorkerChoiceStrategy | undefined) => strategy != null
961 ) as WorkerChoiceStrategy[])
962 ])
963 }
964
965 /** @inheritDoc */
966 public async setDefaultTaskFunction (name: string): Promise<boolean> {
967 return await this.sendTaskFunctionOperationToWorkers({
968 taskFunctionOperation: 'default',
969 taskFunctionProperties: buildTaskFunctionProperties(
970 name,
971 this.taskFunctions.get(name)
972 )
973 })
974 }
975
976 private deleteTaskFunctionWorkerUsages (name: string): void {
977 for (const workerNode of this.workerNodes) {
978 workerNode.deleteTaskFunctionWorkerUsage(name)
979 }
980 }
981
982 private shallExecuteTask (workerNodeKey: number): boolean {
983 return (
984 this.tasksQueueSize(workerNodeKey) === 0 &&
985 this.workerNodes[workerNodeKey].usage.tasks.executing <
986 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
987 this.opts.tasksQueueOptions!.concurrency!
988 )
989 }
990
991 /** @inheritDoc */
992 public async execute (
993 data?: Data,
994 name?: string,
995 transferList?: readonly TransferListItem[]
996 ): Promise<Response> {
997 return await new Promise<Response>((resolve, reject) => {
998 if (!this.started) {
999 reject(new Error('Cannot execute a task on not started pool'))
1000 return
1001 }
1002 if (this.destroying) {
1003 reject(new Error('Cannot execute a task on destroying pool'))
1004 return
1005 }
1006 if (name != null && typeof name !== 'string') {
1007 reject(new TypeError('name argument must be a string'))
1008 return
1009 }
1010 if (
1011 name != null &&
1012 typeof name === 'string' &&
1013 name.trim().length === 0
1014 ) {
1015 reject(new TypeError('name argument must not be an empty string'))
1016 return
1017 }
1018 if (transferList != null && !Array.isArray(transferList)) {
1019 reject(new TypeError('transferList argument must be an array'))
1020 return
1021 }
1022 const timestamp = performance.now()
1023 const taskFunctionStrategy =
1024 this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
1025 const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
1026 const task: Task<Data> = {
1027 name: name ?? DEFAULT_TASK_NAME,
1028 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
1029 data: data ?? ({} as Data),
1030 priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
1031 strategy: taskFunctionStrategy,
1032 transferList,
1033 timestamp,
1034 taskId: randomUUID()
1035 }
1036 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1037 this.promiseResponseMap.set(task.taskId!, {
1038 resolve,
1039 reject,
1040 workerNodeKey,
1041 ...(this.emitter != null && {
1042 asyncResource: new AsyncResource('poolifier:task', {
1043 triggerAsyncId: this.emitter.asyncId,
1044 requireManualDestroy: true
1045 })
1046 })
1047 })
1048 if (
1049 this.opts.enableTasksQueue === false ||
1050 (this.opts.enableTasksQueue === true &&
1051 this.shallExecuteTask(workerNodeKey))
1052 ) {
1053 this.executeTask(workerNodeKey, task)
1054 } else {
1055 this.enqueueTask(workerNodeKey, task)
1056 }
1057 })
1058 }
1059
1060 /**
1061 * Starts the minimum number of workers.
1062 */
1063 private startMinimumNumberOfWorkers (): void {
1064 this.startingMinimumNumberOfWorkers = true
1065 while (
1066 this.workerNodes.reduce(
1067 (accumulator, workerNode) =>
1068 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
1069 0
1070 ) < this.minimumNumberOfWorkers
1071 ) {
1072 this.createAndSetupWorkerNode()
1073 }
1074 this.startingMinimumNumberOfWorkers = false
1075 }
1076
1077 /** @inheritdoc */
1078 public start (): void {
1079 if (this.started) {
1080 throw new Error('Cannot start an already started pool')
1081 }
1082 if (this.starting) {
1083 throw new Error('Cannot start an already starting pool')
1084 }
1085 if (this.destroying) {
1086 throw new Error('Cannot start a destroying pool')
1087 }
1088 this.starting = true
1089 this.startMinimumNumberOfWorkers()
1090 this.starting = false
1091 this.started = true
1092 }
1093
1094 /** @inheritDoc */
1095 public async destroy (): Promise<void> {
1096 if (!this.started) {
1097 throw new Error('Cannot destroy an already destroyed pool')
1098 }
1099 if (this.starting) {
1100 throw new Error('Cannot destroy an starting pool')
1101 }
1102 if (this.destroying) {
1103 throw new Error('Cannot destroy an already destroying pool')
1104 }
1105 this.destroying = true
1106 await Promise.all(
1107 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
1108 await this.destroyWorkerNode(workerNodeKey)
1109 })
1110 )
1111 this.emitter?.emit(PoolEvents.destroy, this.info)
1112 this.emitter?.emitDestroy()
1113 this.readyEventEmitted = false
1114 this.destroying = false
1115 this.started = false
1116 }
1117
1118 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
1119 await new Promise<void>((resolve, reject) => {
1120 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1121 if (this.workerNodes[workerNodeKey] == null) {
1122 resolve()
1123 return
1124 }
1125 const killMessageListener = (message: MessageValue<Response>): void => {
1126 this.checkMessageWorkerId(message)
1127 if (message.kill === 'success') {
1128 resolve()
1129 } else if (message.kill === 'failure') {
1130 reject(
1131 new Error(
1132 `Kill message handling failed on worker ${message.workerId}`
1133 )
1134 )
1135 }
1136 }
1137 // FIXME: should be registered only once
1138 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
1139 this.sendToWorker(workerNodeKey, { kill: true })
1140 })
1141 }
1142
1143 /**
1144 * Terminates the worker node given its worker node key.
1145 *
1146 * @param workerNodeKey - The worker node key.
1147 */
1148 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1149 this.flagWorkerNodeAsNotReady(workerNodeKey)
1150 const flushedTasks = this.flushTasksQueue(workerNodeKey)
1151 const workerNode = this.workerNodes[workerNodeKey]
1152 await waitWorkerNodeEvents(
1153 workerNode,
1154 'taskFinished',
1155 flushedTasks,
1156 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
1157 getDefaultTasksQueueOptions(
1158 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1159 ).tasksFinishedTimeout
1160 )
1161 await this.sendKillMessageToWorker(workerNodeKey)
1162 await workerNode.terminate()
1163 }
1164
1165 /**
1166 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1167 * Can be overridden.
1168 *
1169 * @virtual
1170 */
1171 protected setupHook (): void {
1172 /* Intentionally empty */
1173 }
1174
1175 /**
1176 * Returns whether the worker is the main worker or not.
1177 *
1178 * @returns `true` if the worker is the main worker, `false` otherwise.
1179 */
1180 protected abstract isMain (): boolean
1181
1182 /**
1183 * Hook executed before the worker task execution.
1184 * Can be overridden.
1185 *
1186 * @param workerNodeKey - The worker node key.
1187 * @param task - The task to execute.
1188 */
1189 protected beforeTaskExecutionHook (
1190 workerNodeKey: number,
1191 task: Task<Data>
1192 ): void {
1193 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1194 if (this.workerNodes[workerNodeKey]?.usage != null) {
1195 const workerUsage = this.workerNodes[workerNodeKey].usage
1196 ++workerUsage.tasks.executing
1197 updateWaitTimeWorkerUsage(
1198 this.workerChoiceStrategiesContext,
1199 workerUsage,
1200 task
1201 )
1202 }
1203 if (
1204 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1205 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1206 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1207 null
1208 ) {
1209 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1210 const taskFunctionWorkerUsage = this.workerNodes[
1211 workerNodeKey
1212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1213 ].getTaskFunctionWorkerUsage(task.name!)!
1214 ++taskFunctionWorkerUsage.tasks.executing
1215 updateWaitTimeWorkerUsage(
1216 this.workerChoiceStrategiesContext,
1217 taskFunctionWorkerUsage,
1218 task
1219 )
1220 }
1221 }
1222
1223 /**
1224 * Hook executed after the worker task execution.
1225 * Can be overridden.
1226 *
1227 * @param workerNodeKey - The worker node key.
1228 * @param message - The received message.
1229 */
1230 protected afterTaskExecutionHook (
1231 workerNodeKey: number,
1232 message: MessageValue<Response>
1233 ): void {
1234 let needWorkerChoiceStrategyUpdate = false
1235 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1236 if (this.workerNodes[workerNodeKey]?.usage != null) {
1237 const workerUsage = this.workerNodes[workerNodeKey].usage
1238 updateTaskStatisticsWorkerUsage(workerUsage, message)
1239 updateRunTimeWorkerUsage(
1240 this.workerChoiceStrategiesContext,
1241 workerUsage,
1242 message
1243 )
1244 updateEluWorkerUsage(
1245 this.workerChoiceStrategiesContext,
1246 workerUsage,
1247 message
1248 )
1249 needWorkerChoiceStrategyUpdate = true
1250 }
1251 if (
1252 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1253 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1254 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1255 message.taskPerformance!.name
1256 ) != null
1257 ) {
1258 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1259 const taskFunctionWorkerUsage = this.workerNodes[
1260 workerNodeKey
1261 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1262 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
1263 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1264 updateRunTimeWorkerUsage(
1265 this.workerChoiceStrategiesContext,
1266 taskFunctionWorkerUsage,
1267 message
1268 )
1269 updateEluWorkerUsage(
1270 this.workerChoiceStrategiesContext,
1271 taskFunctionWorkerUsage,
1272 message
1273 )
1274 needWorkerChoiceStrategyUpdate = true
1275 }
1276 if (needWorkerChoiceStrategyUpdate) {
1277 this.workerChoiceStrategiesContext?.update(workerNodeKey)
1278 }
1279 }
1280
1281 /**
1282 * Whether the worker node shall update its task function worker usage or not.
1283 *
1284 * @param workerNodeKey - The worker node key.
1285 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1286 */
1287 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
1288 const workerInfo = this.getWorkerInfo(workerNodeKey)
1289 return (
1290 workerInfo != null &&
1291 Array.isArray(workerInfo.taskFunctionsProperties) &&
1292 workerInfo.taskFunctionsProperties.length > 2
1293 )
1294 }
1295
1296 /**
1297 * Chooses a worker node for the next task.
1298 *
1299 * @param workerChoiceStrategy - The worker choice strategy.
1300 * @returns The chosen worker node key
1301 */
1302 private chooseWorkerNode (
1303 workerChoiceStrategy?: WorkerChoiceStrategy
1304 ): number {
1305 if (this.shallCreateDynamicWorker()) {
1306 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1307 if (
1308 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1309 true
1310 ) {
1311 return workerNodeKey
1312 }
1313 }
1314 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1315 return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
1316 }
1317
1318 /**
1319 * Conditions for dynamic worker creation.
1320 *
1321 * @returns Whether to create a dynamic worker or not.
1322 */
1323 protected abstract shallCreateDynamicWorker (): boolean
1324
1325 /**
1326 * Sends a message to worker given its worker node key.
1327 *
1328 * @param workerNodeKey - The worker node key.
1329 * @param message - The message.
1330 * @param transferList - The optional array of transferable objects.
1331 */
1332 protected abstract sendToWorker (
1333 workerNodeKey: number,
1334 message: MessageValue<Data>,
1335 transferList?: readonly TransferListItem[]
1336 ): void
1337
1338 /**
1339 * Creates a new, completely set up worker node.
1340 *
1341 * @returns New, completely set up worker node key.
1342 */
1343 protected createAndSetupWorkerNode (): number {
1344 const workerNode = this.createWorkerNode()
1345 workerNode.registerWorkerEventHandler(
1346 'online',
1347 this.opts.onlineHandler ?? EMPTY_FUNCTION
1348 )
1349 workerNode.registerWorkerEventHandler(
1350 'message',
1351 this.opts.messageHandler ?? EMPTY_FUNCTION
1352 )
1353 workerNode.registerWorkerEventHandler(
1354 'error',
1355 this.opts.errorHandler ?? EMPTY_FUNCTION
1356 )
1357 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
1358 workerNode.info.ready = false
1359 this.emitter?.emit(PoolEvents.error, error)
1360 if (
1361 this.started &&
1362 !this.destroying &&
1363 this.opts.restartWorkerOnError === true
1364 ) {
1365 if (workerNode.info.dynamic) {
1366 this.createAndSetupDynamicWorkerNode()
1367 } else if (!this.startingMinimumNumberOfWorkers) {
1368 this.startMinimumNumberOfWorkers()
1369 }
1370 }
1371 if (
1372 this.started &&
1373 !this.destroying &&
1374 this.opts.enableTasksQueue === true
1375 ) {
1376 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
1377 }
1378 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1379 workerNode?.terminate().catch((error: unknown) => {
1380 this.emitter?.emit(PoolEvents.error, error)
1381 })
1382 })
1383 workerNode.registerWorkerEventHandler(
1384 'exit',
1385 this.opts.exitHandler ?? EMPTY_FUNCTION
1386 )
1387 workerNode.registerOnceWorkerEventHandler('exit', () => {
1388 this.removeWorkerNode(workerNode)
1389 if (
1390 this.started &&
1391 !this.startingMinimumNumberOfWorkers &&
1392 !this.destroying
1393 ) {
1394 this.startMinimumNumberOfWorkers()
1395 }
1396 })
1397 const workerNodeKey = this.addWorkerNode(workerNode)
1398 this.afterWorkerNodeSetup(workerNodeKey)
1399 return workerNodeKey
1400 }
1401
1402 /**
1403 * Creates a new, completely set up dynamic worker node.
1404 *
1405 * @returns New, completely set up dynamic worker node key.
1406 */
1407 protected createAndSetupDynamicWorkerNode (): number {
1408 const workerNodeKey = this.createAndSetupWorkerNode()
1409 this.registerWorkerMessageListener(workerNodeKey, message => {
1410 this.checkMessageWorkerId(message)
1411 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1412 message.workerId
1413 )
1414 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
1415 // Kill message received from worker
1416 if (
1417 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1418 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1419 ((this.opts.enableTasksQueue === false &&
1420 workerUsage.tasks.executing === 0) ||
1421 (this.opts.enableTasksQueue === true &&
1422 workerUsage.tasks.executing === 0 &&
1423 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1424 ) {
1425 // Flag the worker node as not ready immediately
1426 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
1427 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
1428 this.emitter?.emit(PoolEvents.error, error)
1429 })
1430 }
1431 })
1432 this.sendToWorker(workerNodeKey, {
1433 checkActive: true
1434 })
1435 if (this.taskFunctions.size > 0) {
1436 for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
1437 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1438 taskFunctionOperation: 'add',
1439 taskFunctionProperties: buildTaskFunctionProperties(
1440 taskFunctionName,
1441 taskFunctionObject
1442 ),
1443 taskFunction: taskFunctionObject.taskFunction.toString()
1444 }).catch((error: unknown) => {
1445 this.emitter?.emit(PoolEvents.error, error)
1446 })
1447 }
1448 }
1449 const workerNode = this.workerNodes[workerNodeKey]
1450 workerNode.info.dynamic = true
1451 if (
1452 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
1453 true ||
1454 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1455 true
1456 ) {
1457 workerNode.info.ready = true
1458 }
1459 this.checkAndEmitDynamicWorkerCreationEvents()
1460 return workerNodeKey
1461 }
1462
1463 /**
1464 * Registers a listener callback on the worker given its worker node key.
1465 *
1466 * @param workerNodeKey - The worker node key.
1467 * @param listener - The message listener callback.
1468 */
1469 protected abstract registerWorkerMessageListener<
1470 Message extends Data | Response
1471 >(
1472 workerNodeKey: number,
1473 listener: (message: MessageValue<Message>) => void
1474 ): void
1475
1476 /**
1477 * Registers once a listener callback on the worker given its worker node key.
1478 *
1479 * @param workerNodeKey - The worker node key.
1480 * @param listener - The message listener callback.
1481 */
1482 protected abstract registerOnceWorkerMessageListener<
1483 Message extends Data | Response
1484 >(
1485 workerNodeKey: number,
1486 listener: (message: MessageValue<Message>) => void
1487 ): void
1488
1489 /**
1490 * Deregisters a listener callback on the worker given its worker node key.
1491 *
1492 * @param workerNodeKey - The worker node key.
1493 * @param listener - The message listener callback.
1494 */
1495 protected abstract deregisterWorkerMessageListener<
1496 Message extends Data | Response
1497 >(
1498 workerNodeKey: number,
1499 listener: (message: MessageValue<Message>) => void
1500 ): void
1501
1502 /**
1503 * Method hooked up after a worker node has been newly created.
1504 * Can be overridden.
1505 *
1506 * @param workerNodeKey - The newly created worker node key.
1507 */
1508 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1509 // Listen to worker messages.
1510 this.registerWorkerMessageListener(
1511 workerNodeKey,
1512 this.workerMessageListener
1513 )
1514 // Send the startup message to worker.
1515 this.sendStartupMessageToWorker(workerNodeKey)
1516 // Send the statistics message to worker.
1517 this.sendStatisticsMessageToWorker(workerNodeKey)
1518 if (this.opts.enableTasksQueue === true) {
1519 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1520 this.workerNodes[workerNodeKey].on(
1521 'idle',
1522 this.handleWorkerNodeIdleEvent
1523 )
1524 }
1525 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1526 this.workerNodes[workerNodeKey].on(
1527 'backPressure',
1528 this.handleWorkerNodeBackPressureEvent
1529 )
1530 }
1531 }
1532 }
1533
1534 /**
1535 * Sends the startup message to worker given its worker node key.
1536 *
1537 * @param workerNodeKey - The worker node key.
1538 */
1539 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1540
1541 /**
1542 * Sends the statistics message to worker given its worker node key.
1543 *
1544 * @param workerNodeKey - The worker node key.
1545 */
1546 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1547 this.sendToWorker(workerNodeKey, {
1548 statistics: {
1549 runTime:
1550 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1551 .runTime.aggregate ?? false,
1552 elu:
1553 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1554 .elu.aggregate ?? false
1555 }
1556 })
1557 }
1558
1559 private cannotStealTask (): boolean {
1560 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1561 }
1562
1563 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1564 if (this.shallExecuteTask(workerNodeKey)) {
1565 this.executeTask(workerNodeKey, task)
1566 } else {
1567 this.enqueueTask(workerNodeKey, task)
1568 }
1569 }
1570
1571 private redistributeQueuedTasks (workerNodeKey: number): void {
1572 if (workerNodeKey === -1 || this.cannotStealTask()) {
1573 return
1574 }
1575 while (this.tasksQueueSize(workerNodeKey) > 0) {
1576 const destinationWorkerNodeKey = this.workerNodes.reduce(
1577 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1578 return workerNode.info.ready &&
1579 workerNode.usage.tasks.queued <
1580 workerNodes[minWorkerNodeKey].usage.tasks.queued
1581 ? workerNodeKey
1582 : minWorkerNodeKey
1583 },
1584 0
1585 )
1586 this.handleTask(
1587 destinationWorkerNodeKey,
1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1589 this.dequeueTask(workerNodeKey)!
1590 )
1591 }
1592 }
1593
1594 private updateTaskStolenStatisticsWorkerUsage (
1595 workerNodeKey: number,
1596 taskName: string
1597 ): void {
1598 const workerNode = this.workerNodes[workerNodeKey]
1599 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1600 if (workerNode?.usage != null) {
1601 ++workerNode.usage.tasks.stolen
1602 }
1603 if (
1604 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1605 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1606 ) {
1607 const taskFunctionWorkerUsage =
1608 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1609 workerNode.getTaskFunctionWorkerUsage(taskName)!
1610 ++taskFunctionWorkerUsage.tasks.stolen
1611 }
1612 }
1613
1614 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1615 workerNodeKey: number
1616 ): void {
1617 const workerNode = this.workerNodes[workerNodeKey]
1618 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1619 if (workerNode?.usage != null) {
1620 ++workerNode.usage.tasks.sequentiallyStolen
1621 }
1622 }
1623
1624 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1625 workerNodeKey: number,
1626 taskName: string
1627 ): void {
1628 const workerNode = this.workerNodes[workerNodeKey]
1629 if (
1630 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1631 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1632 ) {
1633 const taskFunctionWorkerUsage =
1634 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1635 workerNode.getTaskFunctionWorkerUsage(taskName)!
1636 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1637 }
1638 }
1639
1640 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1641 workerNodeKey: number
1642 ): void {
1643 const workerNode = this.workerNodes[workerNodeKey]
1644 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1645 if (workerNode?.usage != null) {
1646 workerNode.usage.tasks.sequentiallyStolen = 0
1647 }
1648 }
1649
1650 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1651 workerNodeKey: number,
1652 taskName: string
1653 ): void {
1654 const workerNode = this.workerNodes[workerNodeKey]
1655 if (
1656 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1657 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1658 ) {
1659 const taskFunctionWorkerUsage =
1660 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1661 workerNode.getTaskFunctionWorkerUsage(taskName)!
1662 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1663 }
1664 }
1665
1666 private readonly handleWorkerNodeIdleEvent = (
1667 eventDetail: WorkerNodeEventDetail,
1668 previousStolenTask?: Task<Data>
1669 ): void => {
1670 const { workerNodeKey } = eventDetail
1671 if (workerNodeKey == null) {
1672 throw new Error(
1673 "WorkerNode event detail 'workerNodeKey' property must be defined"
1674 )
1675 }
1676 const workerInfo = this.getWorkerInfo(workerNodeKey)
1677 if (
1678 this.cannotStealTask() ||
1679 (this.info.stealingWorkerNodes ?? 0) >
1680 Math.floor(this.workerNodes.length / 2)
1681 ) {
1682 if (workerInfo != null && previousStolenTask != null) {
1683 workerInfo.stealing = false
1684 }
1685 return
1686 }
1687 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1688 if (
1689 workerInfo != null &&
1690 previousStolenTask != null &&
1691 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1692 (workerNodeTasksUsage.executing > 0 ||
1693 this.tasksQueueSize(workerNodeKey) > 0)
1694 ) {
1695 workerInfo.stealing = false
1696 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1697 for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
1698 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1699 workerNodeKey,
1700 taskFunctionProperties.name
1701 )
1702 }
1703 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1704 return
1705 }
1706 if (workerInfo == null) {
1707 throw new Error(
1708 `Worker node with key '${workerNodeKey}' not found in pool`
1709 )
1710 }
1711 workerInfo.stealing = true
1712 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1713 if (
1714 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1715 stolenTask != null
1716 ) {
1717 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1718 const taskFunctionTasksWorkerUsage = this.workerNodes[
1719 workerNodeKey
1720 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1721 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
1722 if (
1723 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1724 (previousStolenTask != null &&
1725 previousStolenTask.name === stolenTask.name &&
1726 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1727 ) {
1728 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1729 workerNodeKey,
1730 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1731 stolenTask.name!
1732 )
1733 } else {
1734 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1735 workerNodeKey,
1736 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1737 stolenTask.name!
1738 )
1739 }
1740 }
1741 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1742 .then(() => {
1743 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
1744 return undefined
1745 })
1746 .catch((error: unknown) => {
1747 this.emitter?.emit(PoolEvents.error, error)
1748 })
1749 }
1750
1751 private readonly workerNodeStealTask = (
1752 workerNodeKey: number
1753 ): Task<Data> | undefined => {
1754 const workerNodes = this.workerNodes
1755 .slice()
1756 .sort(
1757 (workerNodeA, workerNodeB) =>
1758 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1759 )
1760 const sourceWorkerNode = workerNodes.find(
1761 (sourceWorkerNode, sourceWorkerNodeKey) =>
1762 sourceWorkerNode.info.ready &&
1763 !sourceWorkerNode.info.stealing &&
1764 sourceWorkerNodeKey !== workerNodeKey &&
1765 sourceWorkerNode.usage.tasks.queued > 0
1766 )
1767 if (sourceWorkerNode != null) {
1768 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1769 const task = sourceWorkerNode.dequeueTask(1)!
1770 this.handleTask(workerNodeKey, task)
1771 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
1772 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1773 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1774 return task
1775 }
1776 }
1777
1778 private readonly handleWorkerNodeBackPressureEvent = (
1779 eventDetail: WorkerNodeEventDetail
1780 ): void => {
1781 if (
1782 this.cannotStealTask() ||
1783 (this.info.stealingWorkerNodes ?? 0) >
1784 Math.floor(this.workerNodes.length / 2)
1785 ) {
1786 return
1787 }
1788 const { workerId } = eventDetail
1789 const sizeOffset = 1
1790 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1791 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
1792 return
1793 }
1794 const sourceWorkerNode =
1795 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1796 const workerNodes = this.workerNodes
1797 .slice()
1798 .sort(
1799 (workerNodeA, workerNodeB) =>
1800 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1801 )
1802 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1803 if (
1804 sourceWorkerNode.usage.tasks.queued > 0 &&
1805 workerNode.info.ready &&
1806 !workerNode.info.stealing &&
1807 workerNode.info.id !== workerId &&
1808 workerNode.usage.tasks.queued <
1809 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1810 this.opts.tasksQueueOptions!.size! - sizeOffset
1811 ) {
1812 const workerInfo = this.getWorkerInfo(workerNodeKey)
1813 if (workerInfo == null) {
1814 throw new Error(
1815 `Worker node with key '${workerNodeKey}' not found in pool`
1816 )
1817 }
1818 workerInfo.stealing = true
1819 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1820 const task = sourceWorkerNode.dequeueTask(1)!
1821 this.handleTask(workerNodeKey, task)
1822 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1823 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
1824 workerInfo.stealing = false
1825 }
1826 }
1827 }
1828
1829 /**
1830 * This method is the message listener registered on each worker.
1831 */
1832 protected readonly workerMessageListener = (
1833 message: MessageValue<Response>
1834 ): void => {
1835 this.checkMessageWorkerId(message)
1836 const { workerId, ready, taskId, taskFunctionsProperties } = message
1837 if (ready != null && taskFunctionsProperties != null) {
1838 // Worker ready response received from worker
1839 this.handleWorkerReadyResponse(message)
1840 } else if (taskFunctionsProperties != null) {
1841 // Task function properties message received from worker
1842 const workerInfo = this.getWorkerInfo(
1843 this.getWorkerNodeKeyByWorkerId(workerId)
1844 )
1845 if (workerInfo != null) {
1846 workerInfo.taskFunctionsProperties = taskFunctionsProperties
1847 }
1848 } else if (taskId != null) {
1849 // Task execution response received from worker
1850 this.handleTaskExecutionResponse(message)
1851 }
1852 }
1853
1854 private checkAndEmitReadyEvent (): void {
1855 if (!this.readyEventEmitted && this.ready) {
1856 this.emitter?.emit(PoolEvents.ready, this.info)
1857 this.readyEventEmitted = true
1858 }
1859 }
1860
1861 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1862 const { workerId, ready, taskFunctionsProperties } = message
1863 if (ready == null || !ready) {
1864 throw new Error(`Worker ${workerId} failed to initialize`)
1865 }
1866 const workerNode =
1867 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1868 workerNode.info.ready = ready
1869 workerNode.info.taskFunctionsProperties = taskFunctionsProperties
1870 this.checkAndEmitReadyEvent()
1871 }
1872
1873 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1874 const { workerId, taskId, workerError, data } = message
1875 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1876 const promiseResponse = this.promiseResponseMap.get(taskId!)
1877 if (promiseResponse != null) {
1878 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
1879 const workerNode = this.workerNodes[workerNodeKey]
1880 if (workerError != null) {
1881 this.emitter?.emit(PoolEvents.taskError, workerError)
1882 asyncResource != null
1883 ? asyncResource.runInAsyncScope(
1884 reject,
1885 this.emitter,
1886 workerError.message
1887 )
1888 : reject(workerError.message)
1889 } else {
1890 asyncResource != null
1891 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1892 : resolve(data as Response)
1893 }
1894 asyncResource?.emitDestroy()
1895 this.afterTaskExecutionHook(workerNodeKey, message)
1896 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1897 this.promiseResponseMap.delete(taskId!)
1898 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1899 workerNode?.emit('taskFinished', taskId)
1900 if (
1901 this.opts.enableTasksQueue === true &&
1902 !this.destroying &&
1903 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1904 workerNode != null
1905 ) {
1906 const workerNodeTasksUsage = workerNode.usage.tasks
1907 if (
1908 this.tasksQueueSize(workerNodeKey) > 0 &&
1909 workerNodeTasksUsage.executing <
1910 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1911 this.opts.tasksQueueOptions!.concurrency!
1912 ) {
1913 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1914 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
1915 }
1916 if (
1917 workerNodeTasksUsage.executing === 0 &&
1918 this.tasksQueueSize(workerNodeKey) === 0 &&
1919 workerNodeTasksUsage.sequentiallyStolen === 0
1920 ) {
1921 workerNode.emit('idle', {
1922 workerId,
1923 workerNodeKey
1924 })
1925 }
1926 }
1927 }
1928 }
1929
1930 private checkAndEmitTaskExecutionEvents (): void {
1931 if (this.busy) {
1932 this.emitter?.emit(PoolEvents.busy, this.info)
1933 }
1934 }
1935
1936 private checkAndEmitTaskQueuingEvents (): void {
1937 if (this.hasBackPressure()) {
1938 this.emitter?.emit(PoolEvents.backPressure, this.info)
1939 }
1940 }
1941
1942 /**
1943 * Emits dynamic worker creation events.
1944 */
1945 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1946
1947 /**
1948 * Gets the worker information given its worker node key.
1949 *
1950 * @param workerNodeKey - The worker node key.
1951 * @returns The worker information.
1952 */
1953 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1954 return this.workerNodes[workerNodeKey]?.info
1955 }
1956
1957 /**
1958 * Creates a worker node.
1959 *
1960 * @returns The created worker node.
1961 */
1962 private createWorkerNode (): IWorkerNode<Worker, Data> {
1963 const workerNode = new WorkerNode<Worker, Data>(
1964 this.worker,
1965 this.filePath,
1966 {
1967 env: this.opts.env,
1968 workerOptions: this.opts.workerOptions,
1969 tasksQueueBackPressureSize:
1970 this.opts.tasksQueueOptions?.size ??
1971 getDefaultTasksQueueOptions(
1972 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1973 ).size,
1974 tasksQueueBucketSize:
1975 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
1976 }
1977 )
1978 // Flag the worker node as ready at pool startup.
1979 if (this.starting) {
1980 workerNode.info.ready = true
1981 }
1982 return workerNode
1983 }
1984
1985 /**
1986 * Adds the given worker node in the pool worker nodes.
1987 *
1988 * @param workerNode - The worker node.
1989 * @returns The added worker node key.
1990 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1991 */
1992 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
1993 this.workerNodes.push(workerNode)
1994 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1995 if (workerNodeKey === -1) {
1996 throw new Error('Worker added not found in worker nodes')
1997 }
1998 return workerNodeKey
1999 }
2000
2001 private checkAndEmitEmptyEvent (): void {
2002 if (this.empty) {
2003 this.emitter?.emit(PoolEvents.empty, this.info)
2004 this.readyEventEmitted = false
2005 }
2006 }
2007
2008 /**
2009 * Removes the worker node from the pool worker nodes.
2010 *
2011 * @param workerNode - The worker node.
2012 */
2013 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
2014 const workerNodeKey = this.workerNodes.indexOf(workerNode)
2015 if (workerNodeKey !== -1) {
2016 this.workerNodes.splice(workerNodeKey, 1)
2017 this.workerChoiceStrategiesContext?.remove(workerNodeKey)
2018 }
2019 this.checkAndEmitEmptyEvent()
2020 }
2021
2022 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
2023 const workerInfo = this.getWorkerInfo(workerNodeKey)
2024 if (workerInfo != null) {
2025 workerInfo.ready = false
2026 }
2027 }
2028
2029 private hasBackPressure (): boolean {
2030 return (
2031 this.opts.enableTasksQueue === true &&
2032 this.workerNodes.findIndex(
2033 workerNode => !workerNode.hasBackPressure()
2034 ) === -1
2035 )
2036 }
2037
2038 /**
2039 * Executes the given task on the worker given its worker node key.
2040 *
2041 * @param workerNodeKey - The worker node key.
2042 * @param task - The task to execute.
2043 */
2044 private executeTask (workerNodeKey: number, task: Task<Data>): void {
2045 this.beforeTaskExecutionHook(workerNodeKey, task)
2046 this.sendToWorker(workerNodeKey, task, task.transferList)
2047 this.checkAndEmitTaskExecutionEvents()
2048 }
2049
2050 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
2051 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
2052 this.checkAndEmitTaskQueuingEvents()
2053 return tasksQueueSize
2054 }
2055
2056 private dequeueTask (
2057 workerNodeKey: number,
2058 bucket?: number
2059 ): Task<Data> | undefined {
2060 return this.workerNodes[workerNodeKey].dequeueTask(bucket)
2061 }
2062
2063 private tasksQueueSize (workerNodeKey: number): number {
2064 return this.workerNodes[workerNodeKey].tasksQueueSize()
2065 }
2066
2067 protected flushTasksQueue (workerNodeKey: number): number {
2068 let flushedTasks = 0
2069 while (this.tasksQueueSize(workerNodeKey) > 0) {
2070 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2071 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
2072 ++flushedTasks
2073 }
2074 this.workerNodes[workerNodeKey].clearTasksQueue()
2075 return flushedTasks
2076 }
2077
2078 private flushTasksQueues (): void {
2079 for (const workerNodeKey of this.workerNodes.keys()) {
2080 this.flushTasksQueue(workerNodeKey)
2081 }
2082 }
2083 }