fix: ensure no task can be executed on destroyed pool
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * Whether the pool is started or not.
98 */
99 private started: boolean
100 /**
101 * The start timestamp of the pool.
102 */
103 private readonly startTimestamp
104
105 /**
106 * Constructs a new poolifier pool.
107 *
108 * @param numberOfWorkers - Number of workers that this pool should manage.
109 * @param filePath - Path to the worker file.
110 * @param opts - Options for the pool.
111 */
112 public constructor (
113 protected readonly numberOfWorkers: number,
114 protected readonly filePath: string,
115 protected readonly opts: PoolOptions<Worker>
116 ) {
117 if (!this.isMain()) {
118 throw new Error(
119 'Cannot start a pool from a worker with the same type as the pool'
120 )
121 }
122 this.checkNumberOfWorkers(this.numberOfWorkers)
123 this.checkFilePath(this.filePath)
124 this.checkPoolOptions(this.opts)
125
126 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
127 this.executeTask = this.executeTask.bind(this)
128 this.enqueueTask = this.enqueueTask.bind(this)
129
130 if (this.opts.enableEvents === true) {
131 this.emitter = new PoolEmitter()
132 }
133 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
134 Worker,
135 Data,
136 Response
137 >(
138 this,
139 this.opts.workerChoiceStrategy,
140 this.opts.workerChoiceStrategyOptions
141 )
142
143 this.setupHook()
144
145 this.starting = true
146 this.startPool()
147 this.starting = false
148 this.started = true
149
150 this.startTimestamp = performance.now()
151 }
152
153 private checkFilePath (filePath: string): void {
154 if (
155 filePath == null ||
156 typeof filePath !== 'string' ||
157 (typeof filePath === 'string' && filePath.trim().length === 0)
158 ) {
159 throw new Error('Please specify a file with a worker implementation')
160 }
161 if (!existsSync(filePath)) {
162 throw new Error(`Cannot find the worker file '${filePath}'`)
163 }
164 }
165
166 private checkNumberOfWorkers (numberOfWorkers: number): void {
167 if (numberOfWorkers == null) {
168 throw new Error(
169 'Cannot instantiate a pool without specifying the number of workers'
170 )
171 } else if (!Number.isSafeInteger(numberOfWorkers)) {
172 throw new TypeError(
173 'Cannot instantiate a pool with a non safe integer number of workers'
174 )
175 } else if (numberOfWorkers < 0) {
176 throw new RangeError(
177 'Cannot instantiate a pool with a negative number of workers'
178 )
179 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
180 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
181 }
182 }
183
184 protected checkDynamicPoolSize (min: number, max: number): void {
185 if (this.type === PoolTypes.dynamic) {
186 if (max == null) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
189 )
190 } else if (!Number.isSafeInteger(max)) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
193 )
194 } else if (min > max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 )
198 } else if (max === 0) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
201 )
202 } else if (min === max) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
205 )
206 }
207 }
208 }
209
210 private checkPoolOptions (opts: PoolOptions<Worker>): void {
211 if (isPlainObject(opts)) {
212 this.opts.workerChoiceStrategy =
213 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
215 this.opts.workerChoiceStrategyOptions = {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
217 ...opts.workerChoiceStrategyOptions
218 }
219 this.checkValidWorkerChoiceStrategyOptions(
220 this.opts.workerChoiceStrategyOptions
221 )
222 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
223 this.opts.enableEvents = opts.enableEvents ?? true
224 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
225 if (this.opts.enableTasksQueue) {
226 this.checkValidTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
235 }
236 }
237
238 private checkValidWorkerChoiceStrategy (
239 workerChoiceStrategy: WorkerChoiceStrategy
240 ): void {
241 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
242 throw new Error(
243 `Invalid worker choice strategy '${workerChoiceStrategy}'`
244 )
245 }
246 }
247
248 private checkValidWorkerChoiceStrategyOptions (
249 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
250 ): void {
251 if (!isPlainObject(workerChoiceStrategyOptions)) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: must be a plain object'
254 )
255 }
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
259 ) {
260 throw new TypeError(
261 'Invalid worker choice strategy options: choice retries must be an integer'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions.choiceRetries != null &&
266 workerChoiceStrategyOptions.choiceRetries <= 0
267 ) {
268 throw new RangeError(
269 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
270 )
271 }
272 if (
273 workerChoiceStrategyOptions.weights != null &&
274 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
275 ) {
276 throw new Error(
277 'Invalid worker choice strategy options: must have a weight for each worker node'
278 )
279 }
280 if (
281 workerChoiceStrategyOptions.measurement != null &&
282 !Object.values(Measurements).includes(
283 workerChoiceStrategyOptions.measurement
284 )
285 ) {
286 throw new Error(
287 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
288 )
289 }
290 }
291
292 private checkValidTasksQueueOptions (
293 tasksQueueOptions: TasksQueueOptions
294 ): void {
295 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
296 throw new TypeError('Invalid tasks queue options: must be a plain object')
297 }
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 !Number.isSafeInteger(tasksQueueOptions.concurrency)
301 ) {
302 throw new TypeError(
303 'Invalid worker node tasks concurrency: must be an integer'
304 )
305 }
306 if (
307 tasksQueueOptions?.concurrency != null &&
308 tasksQueueOptions.concurrency <= 0
309 ) {
310 throw new RangeError(
311 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
312 )
313 }
314 if (
315 tasksQueueOptions?.queueMaxSize != null &&
316 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
317 ) {
318 throw new TypeError(
319 'Invalid worker node tasks queue max size: must be an integer'
320 )
321 }
322 if (
323 tasksQueueOptions?.queueMaxSize != null &&
324 tasksQueueOptions.queueMaxSize <= 0
325 ) {
326 throw new RangeError(
327 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
328 )
329 }
330 }
331
332 private startPool (): void {
333 while (
334 this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
337 0
338 ) < this.numberOfWorkers
339 ) {
340 this.createAndSetupWorkerNode()
341 }
342 }
343
344 /** @inheritDoc */
345 public get info (): PoolInfo {
346 return {
347 version,
348 type: this.type,
349 worker: this.worker,
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
352 minSize: this.minSize,
353 maxSize: this.maxSize,
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 workerNode.usage.tasks.executing === 0
362 ? accumulator + 1
363 : accumulator,
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
369 0
370 ),
371 executedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.executed,
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + workerNode.usage.tasks.executing,
379 0
380 ),
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
398 failedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + workerNode.usage.tasks.failed,
401 0
402 ),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .runTime.aggregate && {
405 runTime: {
406 minimum: round(
407 Math.min(
408 ...this.workerNodes.map(
409 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
410 )
411 )
412 ),
413 maximum: round(
414 Math.max(
415 ...this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
417 )
418 )
419 ),
420 average: round(
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
424 0
425 ) /
426 this.workerNodes.reduce(
427 (accumulator, workerNode) =>
428 accumulator + (workerNode.usage.tasks?.executed ?? 0),
429 0
430 )
431 ),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.median && {
434 median: round(
435 median(
436 this.workerNodes.map(
437 (workerNode) => workerNode.usage.runTime?.median ?? 0
438 )
439 )
440 )
441 })
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
447 minimum: round(
448 Math.min(
449 ...this.workerNodes.map(
450 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
451 )
452 )
453 ),
454 maximum: round(
455 Math.max(
456 ...this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
458 )
459 )
460 ),
461 average: round(
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
465 0
466 ) /
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.tasks?.executed ?? 0),
470 0
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
477 this.workerNodes.map(
478 (workerNode) => workerNode.usage.waitTime?.median ?? 0
479 )
480 )
481 )
482 })
483 }
484 })
485 }
486 }
487
488 /**
489 * The pool readiness boolean status.
490 */
491 private get ready (): boolean {
492 return (
493 this.workerNodes.reduce(
494 (accumulator, workerNode) =>
495 !workerNode.info.dynamic && workerNode.info.ready
496 ? accumulator + 1
497 : accumulator,
498 0
499 ) >= this.minSize
500 )
501 }
502
503 /**
504 * The approximate pool utilization.
505 *
506 * @returns The pool utilization.
507 */
508 private get utilization (): number {
509 const poolTimeCapacity =
510 (performance.now() - this.startTimestamp) * this.maxSize
511 const totalTasksRunTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
513 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
514 0
515 )
516 const totalTasksWaitTime = this.workerNodes.reduce(
517 (accumulator, workerNode) =>
518 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
519 0
520 )
521 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
522 }
523
524 /**
525 * The pool type.
526 *
527 * If it is `'dynamic'`, it provides the `max` property.
528 */
529 protected abstract get type (): PoolType
530
531 /**
532 * The worker type.
533 */
534 protected abstract get worker (): WorkerType
535
536 /**
537 * The pool minimum size.
538 */
539 protected get minSize (): number {
540 return this.numberOfWorkers
541 }
542
543 /**
544 * The pool maximum size.
545 */
546 protected get maxSize (): number {
547 return this.max ?? this.numberOfWorkers
548 }
549
550 /**
551 * Checks if the worker id sent in the received message from a worker is valid.
552 *
553 * @param message - The received message.
554 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
555 */
556 private checkMessageWorkerId (message: MessageValue<Response>): void {
557 if (message.workerId == null) {
558 throw new Error('Worker message received without worker id')
559 } else if (
560 message.workerId != null &&
561 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
562 ) {
563 throw new Error(
564 `Worker message received from unknown worker '${message.workerId}'`
565 )
566 }
567 }
568
569 /**
570 * Gets the given worker its worker node key.
571 *
572 * @param worker - The worker.
573 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
574 */
575 private getWorkerNodeKeyByWorker (worker: Worker): number {
576 return this.workerNodes.findIndex(
577 (workerNode) => workerNode.worker === worker
578 )
579 }
580
581 /**
582 * Gets the worker node key given its worker id.
583 *
584 * @param workerId - The worker id.
585 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
586 */
587 private getWorkerNodeKeyByWorkerId (workerId: number): number {
588 return this.workerNodes.findIndex(
589 (workerNode) => workerNode.info.id === workerId
590 )
591 }
592
593 /** @inheritDoc */
594 public setWorkerChoiceStrategy (
595 workerChoiceStrategy: WorkerChoiceStrategy,
596 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
597 ): void {
598 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
599 this.opts.workerChoiceStrategy = workerChoiceStrategy
600 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
601 this.opts.workerChoiceStrategy
602 )
603 if (workerChoiceStrategyOptions != null) {
604 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
605 }
606 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
607 workerNode.resetUsage()
608 this.sendStatisticsMessageToWorker(workerNodeKey)
609 }
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategyOptions (
614 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
615 ): void {
616 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
617 this.opts.workerChoiceStrategyOptions = {
618 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
619 ...workerChoiceStrategyOptions
620 }
621 this.workerChoiceStrategyContext.setOptions(
622 this.opts.workerChoiceStrategyOptions
623 )
624 }
625
626 /** @inheritDoc */
627 public enableTasksQueue (
628 enable: boolean,
629 tasksQueueOptions?: TasksQueueOptions
630 ): void {
631 if (this.opts.enableTasksQueue === true && !enable) {
632 this.flushTasksQueues()
633 }
634 this.opts.enableTasksQueue = enable
635 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
636 }
637
638 /** @inheritDoc */
639 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
640 if (this.opts.enableTasksQueue === true) {
641 this.checkValidTasksQueueOptions(tasksQueueOptions)
642 this.opts.tasksQueueOptions =
643 this.buildTasksQueueOptions(tasksQueueOptions)
644 this.setTasksQueueMaxSize(
645 this.opts.tasksQueueOptions.queueMaxSize as number
646 )
647 } else if (this.opts.tasksQueueOptions != null) {
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
652 private setTasksQueueMaxSize (queueMaxSize: number): void {
653 for (const workerNode of this.workerNodes) {
654 workerNode.tasksQueueBackPressureSize = queueMaxSize
655 }
656 }
657
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
662 ...{
663 queueMaxSize: Math.pow(this.maxSize, 2),
664 concurrency: 1
665 },
666 ...tasksQueueOptions
667 }
668 }
669
670 /**
671 * Whether the pool is full or not.
672 *
673 * The pool filling boolean status.
674 */
675 protected get full (): boolean {
676 return this.workerNodes.length >= this.maxSize
677 }
678
679 /**
680 * Whether the pool is busy or not.
681 *
682 * The pool busyness boolean status.
683 */
684 protected abstract get busy (): boolean
685
686 /**
687 * Whether worker nodes are executing concurrently their tasks quota or not.
688 *
689 * @returns Worker nodes busyness boolean status.
690 */
691 protected internalBusy (): boolean {
692 if (this.opts.enableTasksQueue === true) {
693 return (
694 this.workerNodes.findIndex(
695 (workerNode) =>
696 workerNode.info.ready &&
697 workerNode.usage.tasks.executing <
698 (this.opts.tasksQueueOptions?.concurrency as number)
699 ) === -1
700 )
701 } else {
702 return (
703 this.workerNodes.findIndex(
704 (workerNode) =>
705 workerNode.info.ready && workerNode.usage.tasks.executing === 0
706 ) === -1
707 )
708 }
709 }
710
711 /** @inheritDoc */
712 public listTaskFunctions (): string[] {
713 for (const workerNode of this.workerNodes) {
714 if (
715 Array.isArray(workerNode.info.taskFunctions) &&
716 workerNode.info.taskFunctions.length > 0
717 ) {
718 return workerNode.info.taskFunctions
719 }
720 }
721 return []
722 }
723
724 /** @inheritDoc */
725 public async execute (
726 data?: Data,
727 name?: string,
728 transferList?: TransferListItem[]
729 ): Promise<Response> {
730 return await new Promise<Response>((resolve, reject) => {
731 if (!this.started) {
732 reject(new Error('Cannot execute a task on destroyed pool'))
733 }
734 if (name != null && typeof name !== 'string') {
735 reject(new TypeError('name argument must be a string'))
736 }
737 if (
738 name != null &&
739 typeof name === 'string' &&
740 name.trim().length === 0
741 ) {
742 reject(new TypeError('name argument must not be an empty string'))
743 }
744 if (transferList != null && !Array.isArray(transferList)) {
745 reject(new TypeError('transferList argument must be an array'))
746 }
747 const timestamp = performance.now()
748 const workerNodeKey = this.chooseWorkerNode()
749 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
750 if (
751 name != null &&
752 Array.isArray(workerInfo.taskFunctions) &&
753 !workerInfo.taskFunctions.includes(name)
754 ) {
755 reject(
756 new Error(`Task function '${name}' is not registered in the pool`)
757 )
758 }
759 const task: Task<Data> = {
760 name: name ?? DEFAULT_TASK_NAME,
761 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
762 data: data ?? ({} as Data),
763 transferList,
764 timestamp,
765 workerId: workerInfo.id as number,
766 taskId: randomUUID()
767 }
768 this.promiseResponseMap.set(task.taskId as string, {
769 resolve,
770 reject,
771 workerNodeKey
772 })
773 if (
774 this.opts.enableTasksQueue === false ||
775 (this.opts.enableTasksQueue === true &&
776 this.workerNodes[workerNodeKey].usage.tasks.executing <
777 (this.opts.tasksQueueOptions?.concurrency as number))
778 ) {
779 this.executeTask(workerNodeKey, task)
780 } else {
781 this.enqueueTask(workerNodeKey, task)
782 }
783 })
784 }
785
786 /** @inheritDoc */
787 public async destroy (): Promise<void> {
788 await Promise.all(
789 this.workerNodes.map(async (_, workerNodeKey) => {
790 await this.destroyWorkerNode(workerNodeKey)
791 })
792 )
793 this.emitter?.emit(PoolEvents.destroy, this.info)
794 this.started = false
795 }
796
797 protected async sendKillMessageToWorker (
798 workerNodeKey: number,
799 workerId: number
800 ): Promise<void> {
801 await new Promise<void>((resolve, reject) => {
802 this.registerWorkerMessageListener(workerNodeKey, (message) => {
803 if (message.kill === 'success') {
804 resolve()
805 } else if (message.kill === 'failure') {
806 reject(new Error(`Worker ${workerId} kill message handling failed`))
807 }
808 })
809 this.sendToWorker(workerNodeKey, { kill: true, workerId })
810 })
811 }
812
813 /**
814 * Terminates the worker node given its worker node key.
815 *
816 * @param workerNodeKey - The worker node key.
817 */
818 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
819
820 /**
821 * Setup hook to execute code before worker nodes are created in the abstract constructor.
822 * Can be overridden.
823 *
824 * @virtual
825 */
826 protected setupHook (): void {
827 // Intentionally empty
828 }
829
830 /**
831 * Should return whether the worker is the main worker or not.
832 */
833 protected abstract isMain (): boolean
834
835 /**
836 * Hook executed before the worker task execution.
837 * Can be overridden.
838 *
839 * @param workerNodeKey - The worker node key.
840 * @param task - The task to execute.
841 */
842 protected beforeTaskExecutionHook (
843 workerNodeKey: number,
844 task: Task<Data>
845 ): void {
846 if (this.workerNodes[workerNodeKey]?.usage != null) {
847 const workerUsage = this.workerNodes[workerNodeKey].usage
848 ++workerUsage.tasks.executing
849 this.updateWaitTimeWorkerUsage(workerUsage, task)
850 }
851 if (
852 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
853 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
854 task.name as string
855 ) != null
856 ) {
857 const taskFunctionWorkerUsage = this.workerNodes[
858 workerNodeKey
859 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
860 ++taskFunctionWorkerUsage.tasks.executing
861 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
862 }
863 }
864
865 /**
866 * Hook executed after the worker task execution.
867 * Can be overridden.
868 *
869 * @param workerNodeKey - The worker node key.
870 * @param message - The received message.
871 */
872 protected afterTaskExecutionHook (
873 workerNodeKey: number,
874 message: MessageValue<Response>
875 ): void {
876 if (this.workerNodes[workerNodeKey]?.usage != null) {
877 const workerUsage = this.workerNodes[workerNodeKey].usage
878 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
879 this.updateRunTimeWorkerUsage(workerUsage, message)
880 this.updateEluWorkerUsage(workerUsage, message)
881 }
882 if (
883 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
884 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
885 message.taskPerformance?.name as string
886 ) != null
887 ) {
888 const taskFunctionWorkerUsage = this.workerNodes[
889 workerNodeKey
890 ].getTaskFunctionWorkerUsage(
891 message.taskPerformance?.name as string
892 ) as WorkerUsage
893 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
894 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
895 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
896 }
897 }
898
899 /**
900 * Whether the worker node shall update its task function worker usage or not.
901 *
902 * @param workerNodeKey - The worker node key.
903 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
904 */
905 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
906 const workerInfo = this.getWorkerInfo(workerNodeKey)
907 return (
908 workerInfo != null &&
909 Array.isArray(workerInfo.taskFunctions) &&
910 workerInfo.taskFunctions.length > 2
911 )
912 }
913
914 private updateTaskStatisticsWorkerUsage (
915 workerUsage: WorkerUsage,
916 message: MessageValue<Response>
917 ): void {
918 const workerTaskStatistics = workerUsage.tasks
919 if (
920 workerTaskStatistics.executing != null &&
921 workerTaskStatistics.executing > 0
922 ) {
923 --workerTaskStatistics.executing
924 } else if (
925 workerTaskStatistics.executing != null &&
926 workerTaskStatistics.executing < 0
927 ) {
928 throw new Error(
929 'Worker usage statistic for tasks executing cannot be negative'
930 )
931 }
932 if (message.taskError == null) {
933 ++workerTaskStatistics.executed
934 } else {
935 ++workerTaskStatistics.failed
936 }
937 }
938
939 private updateRunTimeWorkerUsage (
940 workerUsage: WorkerUsage,
941 message: MessageValue<Response>
942 ): void {
943 updateMeasurementStatistics(
944 workerUsage.runTime,
945 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
946 message.taskPerformance?.runTime ?? 0,
947 workerUsage.tasks.executed
948 )
949 }
950
951 private updateWaitTimeWorkerUsage (
952 workerUsage: WorkerUsage,
953 task: Task<Data>
954 ): void {
955 const timestamp = performance.now()
956 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
957 updateMeasurementStatistics(
958 workerUsage.waitTime,
959 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
960 taskWaitTime,
961 workerUsage.tasks.executed
962 )
963 }
964
965 private updateEluWorkerUsage (
966 workerUsage: WorkerUsage,
967 message: MessageValue<Response>
968 ): void {
969 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
970 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
971 updateMeasurementStatistics(
972 workerUsage.elu.active,
973 eluTaskStatisticsRequirements,
974 message.taskPerformance?.elu?.active ?? 0,
975 workerUsage.tasks.executed
976 )
977 updateMeasurementStatistics(
978 workerUsage.elu.idle,
979 eluTaskStatisticsRequirements,
980 message.taskPerformance?.elu?.idle ?? 0,
981 workerUsage.tasks.executed
982 )
983 if (eluTaskStatisticsRequirements.aggregate) {
984 if (message.taskPerformance?.elu != null) {
985 if (workerUsage.elu.utilization != null) {
986 workerUsage.elu.utilization =
987 (workerUsage.elu.utilization +
988 message.taskPerformance.elu.utilization) /
989 2
990 } else {
991 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
992 }
993 }
994 }
995 }
996
997 /**
998 * Chooses a worker node for the next task.
999 *
1000 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1001 *
1002 * @returns The chosen worker node key
1003 */
1004 private chooseWorkerNode (): number {
1005 if (this.shallCreateDynamicWorker()) {
1006 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1007 if (
1008 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1009 ) {
1010 return workerNodeKey
1011 }
1012 }
1013 return this.workerChoiceStrategyContext.execute()
1014 }
1015
1016 /**
1017 * Conditions for dynamic worker creation.
1018 *
1019 * @returns Whether to create a dynamic worker or not.
1020 */
1021 private shallCreateDynamicWorker (): boolean {
1022 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1023 }
1024
1025 /**
1026 * Sends a message to worker given its worker node key.
1027 *
1028 * @param workerNodeKey - The worker node key.
1029 * @param message - The message.
1030 * @param transferList - The optional array of transferable objects.
1031 */
1032 protected abstract sendToWorker (
1033 workerNodeKey: number,
1034 message: MessageValue<Data>,
1035 transferList?: TransferListItem[]
1036 ): void
1037
1038 /**
1039 * Creates a new worker.
1040 *
1041 * @returns Newly created worker.
1042 */
1043 protected abstract createWorker (): Worker
1044
1045 /**
1046 * Creates a new, completely set up worker node.
1047 *
1048 * @returns New, completely set up worker node key.
1049 */
1050 protected createAndSetupWorkerNode (): number {
1051 const worker = this.createWorker()
1052
1053 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1054 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1055 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1056 worker.on('error', (error) => {
1057 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1058 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1059 workerInfo.ready = false
1060 this.workerNodes[workerNodeKey].closeChannel()
1061 this.emitter?.emit(PoolEvents.error, error)
1062 if (
1063 this.opts.restartWorkerOnError === true &&
1064 !this.starting &&
1065 this.started
1066 ) {
1067 if (workerInfo.dynamic) {
1068 this.createAndSetupDynamicWorkerNode()
1069 } else {
1070 this.createAndSetupWorkerNode()
1071 }
1072 }
1073 if (this.opts.enableTasksQueue === true) {
1074 this.redistributeQueuedTasks(workerNodeKey)
1075 }
1076 })
1077 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1078 worker.once('exit', () => {
1079 this.removeWorkerNode(worker)
1080 })
1081
1082 const workerNodeKey = this.addWorkerNode(worker)
1083
1084 this.afterWorkerNodeSetup(workerNodeKey)
1085
1086 return workerNodeKey
1087 }
1088
1089 /**
1090 * Creates a new, completely set up dynamic worker node.
1091 *
1092 * @returns New, completely set up dynamic worker node key.
1093 */
1094 protected createAndSetupDynamicWorkerNode (): number {
1095 const workerNodeKey = this.createAndSetupWorkerNode()
1096 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1097 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1098 message.workerId
1099 )
1100 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1101 // Kill message received from worker
1102 if (
1103 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1104 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1105 ((this.opts.enableTasksQueue === false &&
1106 workerUsage.tasks.executing === 0) ||
1107 (this.opts.enableTasksQueue === true &&
1108 workerUsage.tasks.executing === 0 &&
1109 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1110 ) {
1111 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1112 this.emitter?.emit(PoolEvents.error, error)
1113 })
1114 }
1115 })
1116 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1117 this.sendToWorker(workerNodeKey, {
1118 checkActive: true,
1119 workerId: workerInfo.id as number
1120 })
1121 workerInfo.dynamic = true
1122 if (
1123 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1124 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1125 ) {
1126 workerInfo.ready = true
1127 }
1128 this.checkAndEmitDynamicWorkerCreationEvents()
1129 return workerNodeKey
1130 }
1131
1132 /**
1133 * Registers a listener callback on the worker given its worker node key.
1134 *
1135 * @param workerNodeKey - The worker node key.
1136 * @param listener - The message listener callback.
1137 */
1138 protected abstract registerWorkerMessageListener<
1139 Message extends Data | Response
1140 >(
1141 workerNodeKey: number,
1142 listener: (message: MessageValue<Message>) => void
1143 ): void
1144
1145 /**
1146 * Method hooked up after a worker node has been newly created.
1147 * Can be overridden.
1148 *
1149 * @param workerNodeKey - The newly created worker node key.
1150 */
1151 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1152 // Listen to worker messages.
1153 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1154 // Send the startup message to worker.
1155 this.sendStartupMessageToWorker(workerNodeKey)
1156 // Send the statistics message to worker.
1157 this.sendStatisticsMessageToWorker(workerNodeKey)
1158 if (this.opts.enableTasksQueue === true) {
1159 this.workerNodes[workerNodeKey].onBackPressure =
1160 this.tasksStealingOnBackPressure.bind(this)
1161 }
1162 }
1163
1164 /**
1165 * Sends the startup message to worker given its worker node key.
1166 *
1167 * @param workerNodeKey - The worker node key.
1168 */
1169 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1170
1171 /**
1172 * Sends the statistics message to worker given its worker node key.
1173 *
1174 * @param workerNodeKey - The worker node key.
1175 */
1176 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1177 this.sendToWorker(workerNodeKey, {
1178 statistics: {
1179 runTime:
1180 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1181 .runTime.aggregate,
1182 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1183 .elu.aggregate
1184 },
1185 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1186 })
1187 }
1188
1189 private redistributeQueuedTasks (workerNodeKey: number): void {
1190 while (this.tasksQueueSize(workerNodeKey) > 0) {
1191 let targetWorkerNodeKey: number = workerNodeKey
1192 let minQueuedTasks = Infinity
1193 let executeTask = false
1194 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1195 if (
1196 this.workerNodes[workerNodeId].usage.tasks.executing <
1197 (this.opts.tasksQueueOptions?.concurrency as number)
1198 ) {
1199 executeTask = true
1200 }
1201 if (
1202 workerNodeId !== workerNodeKey &&
1203 workerNode.info.ready &&
1204 workerNode.usage.tasks.queued === 0
1205 ) {
1206 targetWorkerNodeKey = workerNodeId
1207 break
1208 }
1209 if (
1210 workerNodeId !== workerNodeKey &&
1211 workerNode.info.ready &&
1212 workerNode.usage.tasks.queued < minQueuedTasks
1213 ) {
1214 minQueuedTasks = workerNode.usage.tasks.queued
1215 targetWorkerNodeKey = workerNodeId
1216 }
1217 }
1218 if (executeTask) {
1219 this.executeTask(
1220 targetWorkerNodeKey,
1221 this.dequeueTask(workerNodeKey) as Task<Data>
1222 )
1223 } else {
1224 this.enqueueTask(
1225 targetWorkerNodeKey,
1226 this.dequeueTask(workerNodeKey) as Task<Data>
1227 )
1228 }
1229 }
1230 }
1231
1232 private tasksStealingOnBackPressure (workerId: number): void {
1233 const sourceWorkerNode =
1234 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1235 const workerNodes = this.workerNodes
1236 .filter((workerNode) => workerNode.info.id !== workerId)
1237 .sort(
1238 (workerNodeA, workerNodeB) =>
1239 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1240 )
1241 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1242 if (
1243 workerNode.info.ready &&
1244 sourceWorkerNode.usage.tasks.queued > 0 &&
1245 !workerNode.hasBackPressure() &&
1246 workerNode.usage.tasks.executing <
1247 (this.opts.tasksQueueOptions?.concurrency as number)
1248 ) {
1249 this.executeTask(
1250 workerNodeKey,
1251 sourceWorkerNode.popTask() as Task<Data>
1252 )
1253 } else if (
1254 workerNode.info.ready &&
1255 sourceWorkerNode.usage.tasks.queued > 0 &&
1256 !workerNode.hasBackPressure() &&
1257 workerNode.usage.tasks.executing >=
1258 (this.opts.tasksQueueOptions?.concurrency as number)
1259 ) {
1260 this.enqueueTask(
1261 workerNodeKey,
1262 sourceWorkerNode.popTask() as Task<Data>
1263 )
1264 }
1265 }
1266 }
1267
1268 /**
1269 * This method is the listener registered for each worker message.
1270 *
1271 * @returns The listener function to execute when a message is received from a worker.
1272 */
1273 protected workerListener (): (message: MessageValue<Response>) => void {
1274 return (message) => {
1275 this.checkMessageWorkerId(message)
1276 if (message.ready != null && message.taskFunctions != null) {
1277 // Worker ready response received from worker
1278 this.handleWorkerReadyResponse(message)
1279 } else if (message.taskId != null) {
1280 // Task execution response received from worker
1281 this.handleTaskExecutionResponse(message)
1282 } else if (message.taskFunctions != null) {
1283 // Task functions message received from worker
1284 (
1285 this.getWorkerInfo(
1286 this.getWorkerNodeKeyByWorkerId(message.workerId)
1287 ) as WorkerInfo
1288 ).taskFunctions = message.taskFunctions
1289 }
1290 }
1291 }
1292
1293 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1294 if (message.ready === false) {
1295 throw new Error(`Worker ${message.workerId} failed to initialize`)
1296 }
1297 const workerInfo = this.getWorkerInfo(
1298 this.getWorkerNodeKeyByWorkerId(message.workerId)
1299 ) as WorkerInfo
1300 workerInfo.ready = message.ready as boolean
1301 workerInfo.taskFunctions = message.taskFunctions
1302 if (this.emitter != null && this.ready) {
1303 this.emitter.emit(PoolEvents.ready, this.info)
1304 }
1305 }
1306
1307 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1308 const { taskId, taskError, data } = message
1309 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1310 if (promiseResponse != null) {
1311 if (taskError != null) {
1312 this.emitter?.emit(PoolEvents.taskError, taskError)
1313 promiseResponse.reject(taskError.message)
1314 } else {
1315 promiseResponse.resolve(data as Response)
1316 }
1317 const workerNodeKey = promiseResponse.workerNodeKey
1318 this.afterTaskExecutionHook(workerNodeKey, message)
1319 this.promiseResponseMap.delete(taskId as string)
1320 if (
1321 this.opts.enableTasksQueue === true &&
1322 this.tasksQueueSize(workerNodeKey) > 0 &&
1323 this.workerNodes[workerNodeKey].usage.tasks.executing <
1324 (this.opts.tasksQueueOptions?.concurrency as number)
1325 ) {
1326 this.executeTask(
1327 workerNodeKey,
1328 this.dequeueTask(workerNodeKey) as Task<Data>
1329 )
1330 }
1331 this.workerChoiceStrategyContext.update(workerNodeKey)
1332 }
1333 }
1334
1335 private checkAndEmitTaskExecutionEvents (): void {
1336 if (this.busy) {
1337 this.emitter?.emit(PoolEvents.busy, this.info)
1338 }
1339 }
1340
1341 private checkAndEmitTaskQueuingEvents (): void {
1342 if (this.hasBackPressure()) {
1343 this.emitter?.emit(PoolEvents.backPressure, this.info)
1344 }
1345 }
1346
1347 private checkAndEmitDynamicWorkerCreationEvents (): void {
1348 if (this.type === PoolTypes.dynamic) {
1349 if (this.full) {
1350 this.emitter?.emit(PoolEvents.full, this.info)
1351 }
1352 }
1353 }
1354
1355 /**
1356 * Gets the worker information given its worker node key.
1357 *
1358 * @param workerNodeKey - The worker node key.
1359 * @returns The worker information.
1360 */
1361 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1362 return this.workerNodes[workerNodeKey]?.info
1363 }
1364
1365 /**
1366 * Adds the given worker in the pool worker nodes.
1367 *
1368 * @param worker - The worker.
1369 * @returns The added worker node key.
1370 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1371 */
1372 private addWorkerNode (worker: Worker): number {
1373 const workerNode = new WorkerNode<Worker, Data>(
1374 worker,
1375 this.worker,
1376 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
1377 )
1378 // Flag the worker node as ready at pool startup.
1379 if (this.starting) {
1380 workerNode.info.ready = true
1381 }
1382 this.workerNodes.push(workerNode)
1383 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1384 if (workerNodeKey === -1) {
1385 throw new Error('Worker node added not found')
1386 }
1387 return workerNodeKey
1388 }
1389
1390 /**
1391 * Removes the given worker from the pool worker nodes.
1392 *
1393 * @param worker - The worker.
1394 */
1395 private removeWorkerNode (worker: Worker): void {
1396 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1397 if (workerNodeKey !== -1) {
1398 this.workerNodes.splice(workerNodeKey, 1)
1399 this.workerChoiceStrategyContext.remove(workerNodeKey)
1400 }
1401 }
1402
1403 /** @inheritDoc */
1404 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1405 return (
1406 this.opts.enableTasksQueue === true &&
1407 this.workerNodes[workerNodeKey].hasBackPressure()
1408 )
1409 }
1410
1411 private hasBackPressure (): boolean {
1412 return (
1413 this.opts.enableTasksQueue === true &&
1414 this.workerNodes.findIndex(
1415 (workerNode) => !workerNode.hasBackPressure()
1416 ) === -1
1417 )
1418 }
1419
1420 /**
1421 * Executes the given task on the worker given its worker node key.
1422 *
1423 * @param workerNodeKey - The worker node key.
1424 * @param task - The task to execute.
1425 */
1426 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1427 this.beforeTaskExecutionHook(workerNodeKey, task)
1428 this.sendToWorker(workerNodeKey, task, task.transferList)
1429 this.checkAndEmitTaskExecutionEvents()
1430 }
1431
1432 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1433 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1434 this.checkAndEmitTaskQueuingEvents()
1435 return tasksQueueSize
1436 }
1437
1438 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1439 return this.workerNodes[workerNodeKey].dequeueTask()
1440 }
1441
1442 private tasksQueueSize (workerNodeKey: number): number {
1443 return this.workerNodes[workerNodeKey].tasksQueueSize()
1444 }
1445
1446 protected flushTasksQueue (workerNodeKey: number): void {
1447 while (this.tasksQueueSize(workerNodeKey) > 0) {
1448 this.executeTask(
1449 workerNodeKey,
1450 this.dequeueTask(workerNodeKey) as Task<Data>
1451 )
1452 }
1453 this.workerNodes[workerNodeKey].clearTasksQueue()
1454 }
1455
1456 private flushTasksQueues (): void {
1457 for (const [workerNodeKey] of this.workerNodes.entries()) {
1458 this.flushTasksQueue(workerNodeKey)
1459 }
1460 }
1461 }