e61991f222ae98024f47038d8e211b420b8ed507
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * Whether the pool is started or not.
98 */
99 private started: boolean
100 /**
101 * The start timestamp of the pool.
102 */
103 private readonly startTimestamp
104
105 /**
106 * Constructs a new poolifier pool.
107 *
108 * @param numberOfWorkers - Number of workers that this pool should manage.
109 * @param filePath - Path to the worker file.
110 * @param opts - Options for the pool.
111 */
112 public constructor (
113 protected readonly numberOfWorkers: number,
114 protected readonly filePath: string,
115 protected readonly opts: PoolOptions<Worker>
116 ) {
117 if (!this.isMain()) {
118 throw new Error(
119 'Cannot start a pool from a worker with the same type as the pool'
120 )
121 }
122 this.checkNumberOfWorkers(this.numberOfWorkers)
123 this.checkFilePath(this.filePath)
124 this.checkPoolOptions(this.opts)
125
126 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
127 this.executeTask = this.executeTask.bind(this)
128 this.enqueueTask = this.enqueueTask.bind(this)
129
130 if (this.opts.enableEvents === true) {
131 this.emitter = new PoolEmitter()
132 }
133 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
134 Worker,
135 Data,
136 Response
137 >(
138 this,
139 this.opts.workerChoiceStrategy,
140 this.opts.workerChoiceStrategyOptions
141 )
142
143 this.setupHook()
144
145 this.starting = true
146 this.startPool()
147 this.starting = false
148 this.started = true
149
150 this.startTimestamp = performance.now()
151 }
152
153 private checkFilePath (filePath: string): void {
154 if (
155 filePath == null ||
156 typeof filePath !== 'string' ||
157 (typeof filePath === 'string' && filePath.trim().length === 0)
158 ) {
159 throw new Error('Please specify a file with a worker implementation')
160 }
161 if (!existsSync(filePath)) {
162 throw new Error(`Cannot find the worker file '${filePath}'`)
163 }
164 }
165
166 private checkNumberOfWorkers (numberOfWorkers: number): void {
167 if (numberOfWorkers == null) {
168 throw new Error(
169 'Cannot instantiate a pool without specifying the number of workers'
170 )
171 } else if (!Number.isSafeInteger(numberOfWorkers)) {
172 throw new TypeError(
173 'Cannot instantiate a pool with a non safe integer number of workers'
174 )
175 } else if (numberOfWorkers < 0) {
176 throw new RangeError(
177 'Cannot instantiate a pool with a negative number of workers'
178 )
179 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
180 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
181 }
182 }
183
184 protected checkDynamicPoolSize (min: number, max: number): void {
185 if (this.type === PoolTypes.dynamic) {
186 if (max == null) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
189 )
190 } else if (!Number.isSafeInteger(max)) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
193 )
194 } else if (min > max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 )
198 } else if (max === 0) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
201 )
202 } else if (min === max) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
205 )
206 }
207 }
208 }
209
210 private checkPoolOptions (opts: PoolOptions<Worker>): void {
211 if (isPlainObject(opts)) {
212 this.opts.workerChoiceStrategy =
213 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
215 this.opts.workerChoiceStrategyOptions = {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
217 ...opts.workerChoiceStrategyOptions
218 }
219 this.checkValidWorkerChoiceStrategyOptions(
220 this.opts.workerChoiceStrategyOptions
221 )
222 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
223 this.opts.enableEvents = opts.enableEvents ?? true
224 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
225 if (this.opts.enableTasksQueue) {
226 this.checkValidTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
235 }
236 }
237
238 private checkValidWorkerChoiceStrategy (
239 workerChoiceStrategy: WorkerChoiceStrategy
240 ): void {
241 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
242 throw new Error(
243 `Invalid worker choice strategy '${workerChoiceStrategy}'`
244 )
245 }
246 }
247
248 private checkValidWorkerChoiceStrategyOptions (
249 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
250 ): void {
251 if (!isPlainObject(workerChoiceStrategyOptions)) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: must be a plain object'
254 )
255 }
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
259 ) {
260 throw new TypeError(
261 'Invalid worker choice strategy options: choice retries must be an integer'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions.choiceRetries != null &&
266 workerChoiceStrategyOptions.choiceRetries <= 0
267 ) {
268 throw new RangeError(
269 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
270 )
271 }
272 if (
273 workerChoiceStrategyOptions.weights != null &&
274 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
275 ) {
276 throw new Error(
277 'Invalid worker choice strategy options: must have a weight for each worker node'
278 )
279 }
280 if (
281 workerChoiceStrategyOptions.measurement != null &&
282 !Object.values(Measurements).includes(
283 workerChoiceStrategyOptions.measurement
284 )
285 ) {
286 throw new Error(
287 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
288 )
289 }
290 }
291
292 private checkValidTasksQueueOptions (
293 tasksQueueOptions: TasksQueueOptions
294 ): void {
295 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
296 throw new TypeError('Invalid tasks queue options: must be a plain object')
297 }
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 !Number.isSafeInteger(tasksQueueOptions.concurrency)
301 ) {
302 throw new TypeError(
303 'Invalid worker node tasks concurrency: must be an integer'
304 )
305 }
306 if (
307 tasksQueueOptions?.concurrency != null &&
308 tasksQueueOptions.concurrency <= 0
309 ) {
310 throw new RangeError(
311 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
312 )
313 }
314 if (
315 tasksQueueOptions?.queueMaxSize != null &&
316 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
317 ) {
318 throw new TypeError(
319 'Invalid worker node tasks queue max size: must be an integer'
320 )
321 }
322 if (
323 tasksQueueOptions?.queueMaxSize != null &&
324 tasksQueueOptions.queueMaxSize <= 0
325 ) {
326 throw new RangeError(
327 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
328 )
329 }
330 }
331
332 private startPool (): void {
333 while (
334 this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
337 0
338 ) < this.numberOfWorkers
339 ) {
340 this.createAndSetupWorkerNode()
341 }
342 }
343
344 /** @inheritDoc */
345 public get info (): PoolInfo {
346 return {
347 version,
348 type: this.type,
349 worker: this.worker,
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
352 minSize: this.minSize,
353 maxSize: this.maxSize,
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 workerNode.usage.tasks.executing === 0
362 ? accumulator + 1
363 : accumulator,
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
369 0
370 ),
371 executedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.executed,
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + workerNode.usage.tasks.executing,
379 0
380 ),
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
398 failedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + workerNode.usage.tasks.failed,
401 0
402 ),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .runTime.aggregate && {
405 runTime: {
406 minimum: round(
407 Math.min(
408 ...this.workerNodes.map(
409 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
410 )
411 )
412 ),
413 maximum: round(
414 Math.max(
415 ...this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
417 )
418 )
419 ),
420 average: round(
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
424 0
425 ) /
426 this.workerNodes.reduce(
427 (accumulator, workerNode) =>
428 accumulator + (workerNode.usage.tasks?.executed ?? 0),
429 0
430 )
431 ),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.median && {
434 median: round(
435 median(
436 this.workerNodes.map(
437 (workerNode) => workerNode.usage.runTime?.median ?? 0
438 )
439 )
440 )
441 })
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
447 minimum: round(
448 Math.min(
449 ...this.workerNodes.map(
450 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
451 )
452 )
453 ),
454 maximum: round(
455 Math.max(
456 ...this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
458 )
459 )
460 ),
461 average: round(
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
465 0
466 ) /
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.tasks?.executed ?? 0),
470 0
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
477 this.workerNodes.map(
478 (workerNode) => workerNode.usage.waitTime?.median ?? 0
479 )
480 )
481 )
482 })
483 }
484 })
485 }
486 }
487
488 /**
489 * The pool readiness boolean status.
490 */
491 private get ready (): boolean {
492 return (
493 this.workerNodes.reduce(
494 (accumulator, workerNode) =>
495 !workerNode.info.dynamic && workerNode.info.ready
496 ? accumulator + 1
497 : accumulator,
498 0
499 ) >= this.minSize
500 )
501 }
502
503 /**
504 * The approximate pool utilization.
505 *
506 * @returns The pool utilization.
507 */
508 private get utilization (): number {
509 const poolTimeCapacity =
510 (performance.now() - this.startTimestamp) * this.maxSize
511 const totalTasksRunTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
513 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
514 0
515 )
516 const totalTasksWaitTime = this.workerNodes.reduce(
517 (accumulator, workerNode) =>
518 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
519 0
520 )
521 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
522 }
523
524 /**
525 * The pool type.
526 *
527 * If it is `'dynamic'`, it provides the `max` property.
528 */
529 protected abstract get type (): PoolType
530
531 /**
532 * The worker type.
533 */
534 protected abstract get worker (): WorkerType
535
536 /**
537 * The pool minimum size.
538 */
539 protected get minSize (): number {
540 return this.numberOfWorkers
541 }
542
543 /**
544 * The pool maximum size.
545 */
546 protected get maxSize (): number {
547 return this.max ?? this.numberOfWorkers
548 }
549
550 /**
551 * Checks if the worker id sent in the received message from a worker is valid.
552 *
553 * @param message - The received message.
554 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
555 */
556 private checkMessageWorkerId (message: MessageValue<Response>): void {
557 if (message.workerId == null) {
558 throw new Error('Worker message received without worker id')
559 } else if (
560 message.workerId != null &&
561 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
562 ) {
563 throw new Error(
564 `Worker message received from unknown worker '${message.workerId}'`
565 )
566 }
567 }
568
569 /**
570 * Gets the given worker its worker node key.
571 *
572 * @param worker - The worker.
573 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
574 */
575 private getWorkerNodeKeyByWorker (worker: Worker): number {
576 return this.workerNodes.findIndex(
577 (workerNode) => workerNode.worker === worker
578 )
579 }
580
581 /**
582 * Gets the worker node key given its worker id.
583 *
584 * @param workerId - The worker id.
585 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
586 */
587 private getWorkerNodeKeyByWorkerId (workerId: number): number {
588 return this.workerNodes.findIndex(
589 (workerNode) => workerNode.info.id === workerId
590 )
591 }
592
593 /** @inheritDoc */
594 public setWorkerChoiceStrategy (
595 workerChoiceStrategy: WorkerChoiceStrategy,
596 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
597 ): void {
598 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
599 this.opts.workerChoiceStrategy = workerChoiceStrategy
600 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
601 this.opts.workerChoiceStrategy
602 )
603 if (workerChoiceStrategyOptions != null) {
604 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
605 }
606 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
607 workerNode.resetUsage()
608 this.sendStatisticsMessageToWorker(workerNodeKey)
609 }
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategyOptions (
614 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
615 ): void {
616 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
617 this.opts.workerChoiceStrategyOptions = {
618 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
619 ...workerChoiceStrategyOptions
620 }
621 this.workerChoiceStrategyContext.setOptions(
622 this.opts.workerChoiceStrategyOptions
623 )
624 }
625
626 /** @inheritDoc */
627 public enableTasksQueue (
628 enable: boolean,
629 tasksQueueOptions?: TasksQueueOptions
630 ): void {
631 if (this.opts.enableTasksQueue === true && !enable) {
632 this.flushTasksQueues()
633 }
634 this.opts.enableTasksQueue = enable
635 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
636 }
637
638 /** @inheritDoc */
639 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
640 if (this.opts.enableTasksQueue === true) {
641 this.checkValidTasksQueueOptions(tasksQueueOptions)
642 this.opts.tasksQueueOptions =
643 this.buildTasksQueueOptions(tasksQueueOptions)
644 this.setTasksQueueMaxSize(
645 this.opts.tasksQueueOptions.queueMaxSize as number
646 )
647 } else if (this.opts.tasksQueueOptions != null) {
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
652 private setTasksQueueMaxSize (queueMaxSize: number): void {
653 for (const workerNode of this.workerNodes) {
654 workerNode.tasksQueueBackPressureSize = queueMaxSize
655 }
656 }
657
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
662 ...{
663 queueMaxSize: Math.pow(this.maxSize, 2),
664 concurrency: 1
665 },
666 ...tasksQueueOptions
667 }
668 }
669
670 /**
671 * Whether the pool is full or not.
672 *
673 * The pool filling boolean status.
674 */
675 protected get full (): boolean {
676 return this.workerNodes.length >= this.maxSize
677 }
678
679 /**
680 * Whether the pool is busy or not.
681 *
682 * The pool busyness boolean status.
683 */
684 protected abstract get busy (): boolean
685
686 /**
687 * Whether worker nodes are executing concurrently their tasks quota or not.
688 *
689 * @returns Worker nodes busyness boolean status.
690 */
691 protected internalBusy (): boolean {
692 if (this.opts.enableTasksQueue === true) {
693 return (
694 this.workerNodes.findIndex(
695 (workerNode) =>
696 workerNode.info.ready &&
697 workerNode.usage.tasks.executing <
698 (this.opts.tasksQueueOptions?.concurrency as number)
699 ) === -1
700 )
701 } else {
702 return (
703 this.workerNodes.findIndex(
704 (workerNode) =>
705 workerNode.info.ready && workerNode.usage.tasks.executing === 0
706 ) === -1
707 )
708 }
709 }
710
711 /** @inheritDoc */
712 public listTaskFunctions (): string[] {
713 for (const workerNode of this.workerNodes) {
714 if (
715 Array.isArray(workerNode.info.taskFunctions) &&
716 workerNode.info.taskFunctions.length > 0
717 ) {
718 return workerNode.info.taskFunctions
719 }
720 }
721 return []
722 }
723
724 /** @inheritDoc */
725 public async execute (
726 data?: Data,
727 name?: string,
728 transferList?: TransferListItem[]
729 ): Promise<Response> {
730 return await new Promise<Response>((resolve, reject) => {
731 if (!this.started) {
732 reject(new Error('Cannot execute a task on destroyed pool'))
733 }
734 if (name != null && typeof name !== 'string') {
735 reject(new TypeError('name argument must be a string'))
736 }
737 if (
738 name != null &&
739 typeof name === 'string' &&
740 name.trim().length === 0
741 ) {
742 reject(new TypeError('name argument must not be an empty string'))
743 }
744 if (transferList != null && !Array.isArray(transferList)) {
745 reject(new TypeError('transferList argument must be an array'))
746 }
747 const timestamp = performance.now()
748 const workerNodeKey = this.chooseWorkerNode()
749 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
750 if (
751 name != null &&
752 Array.isArray(workerInfo.taskFunctions) &&
753 !workerInfo.taskFunctions.includes(name)
754 ) {
755 reject(
756 new Error(`Task function '${name}' is not registered in the pool`)
757 )
758 }
759 const task: Task<Data> = {
760 name: name ?? DEFAULT_TASK_NAME,
761 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
762 data: data ?? ({} as Data),
763 transferList,
764 timestamp,
765 workerId: workerInfo.id as number,
766 taskId: randomUUID()
767 }
768 this.promiseResponseMap.set(task.taskId as string, {
769 resolve,
770 reject,
771 workerNodeKey
772 })
773 if (
774 this.opts.enableTasksQueue === false ||
775 (this.opts.enableTasksQueue === true &&
776 this.workerNodes[workerNodeKey].usage.tasks.executing <
777 (this.opts.tasksQueueOptions?.concurrency as number))
778 ) {
779 this.executeTask(workerNodeKey, task)
780 } else {
781 this.enqueueTask(workerNodeKey, task)
782 }
783 })
784 }
785
786 /** @inheritDoc */
787 public async destroy (): Promise<void> {
788 await Promise.all(
789 this.workerNodes.map(async (_, workerNodeKey) => {
790 await this.destroyWorkerNode(workerNodeKey)
791 })
792 )
793 this.emitter?.emit(PoolEvents.destroy, this.info)
794 this.started = false
795 }
796
797 protected async sendKillMessageToWorker (
798 workerNodeKey: number,
799 workerId: number
800 ): Promise<void> {
801 await new Promise<void>((resolve, reject) => {
802 this.registerWorkerMessageListener(workerNodeKey, (message) => {
803 if (message.kill === 'success') {
804 resolve()
805 } else if (message.kill === 'failure') {
806 reject(new Error(`Worker ${workerId} kill message handling failed`))
807 }
808 })
809 this.sendToWorker(workerNodeKey, { kill: true, workerId })
810 })
811 }
812
813 /**
814 * Terminates the worker node given its worker node key.
815 *
816 * @param workerNodeKey - The worker node key.
817 */
818 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
819
820 /**
821 * Setup hook to execute code before worker nodes are created in the abstract constructor.
822 * Can be overridden.
823 *
824 * @virtual
825 */
826 protected setupHook (): void {
827 // Intentionally empty
828 }
829
830 /**
831 * Should return whether the worker is the main worker or not.
832 */
833 protected abstract isMain (): boolean
834
835 /**
836 * Hook executed before the worker task execution.
837 * Can be overridden.
838 *
839 * @param workerNodeKey - The worker node key.
840 * @param task - The task to execute.
841 */
842 protected beforeTaskExecutionHook (
843 workerNodeKey: number,
844 task: Task<Data>
845 ): void {
846 if (this.workerNodes[workerNodeKey]?.usage != null) {
847 const workerUsage = this.workerNodes[workerNodeKey].usage
848 ++workerUsage.tasks.executing
849 this.updateWaitTimeWorkerUsage(workerUsage, task)
850 }
851 if (
852 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
853 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
854 task.name as string
855 ) != null
856 ) {
857 const taskFunctionWorkerUsage = this.workerNodes[
858 workerNodeKey
859 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
860 ++taskFunctionWorkerUsage.tasks.executing
861 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
862 }
863 }
864
865 /**
866 * Hook executed after the worker task execution.
867 * Can be overridden.
868 *
869 * @param workerNodeKey - The worker node key.
870 * @param message - The received message.
871 */
872 protected afterTaskExecutionHook (
873 workerNodeKey: number,
874 message: MessageValue<Response>
875 ): void {
876 if (this.workerNodes[workerNodeKey]?.usage != null) {
877 const workerUsage = this.workerNodes[workerNodeKey].usage
878 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
879 this.updateRunTimeWorkerUsage(workerUsage, message)
880 this.updateEluWorkerUsage(workerUsage, message)
881 }
882 if (
883 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
884 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
885 message.taskPerformance?.name as string
886 ) != null
887 ) {
888 const taskFunctionWorkerUsage = this.workerNodes[
889 workerNodeKey
890 ].getTaskFunctionWorkerUsage(
891 message.taskPerformance?.name as string
892 ) as WorkerUsage
893 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
894 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
895 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
896 }
897 }
898
899 /**
900 * Whether the worker node shall update its task function worker usage or not.
901 *
902 * @param workerNodeKey - The worker node key.
903 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
904 */
905 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
906 const workerInfo = this.getWorkerInfo(workerNodeKey)
907 return (
908 workerInfo != null &&
909 Array.isArray(workerInfo.taskFunctions) &&
910 workerInfo.taskFunctions.length > 2
911 )
912 }
913
914 private updateTaskStatisticsWorkerUsage (
915 workerUsage: WorkerUsage,
916 message: MessageValue<Response>
917 ): void {
918 const workerTaskStatistics = workerUsage.tasks
919 if (
920 workerTaskStatistics.executing != null &&
921 workerTaskStatistics.executing > 0
922 ) {
923 --workerTaskStatistics.executing
924 }
925 if (message.taskError == null) {
926 ++workerTaskStatistics.executed
927 } else {
928 ++workerTaskStatistics.failed
929 }
930 }
931
932 private updateRunTimeWorkerUsage (
933 workerUsage: WorkerUsage,
934 message: MessageValue<Response>
935 ): void {
936 updateMeasurementStatistics(
937 workerUsage.runTime,
938 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
939 message.taskPerformance?.runTime ?? 0,
940 workerUsage.tasks.executed
941 )
942 }
943
944 private updateWaitTimeWorkerUsage (
945 workerUsage: WorkerUsage,
946 task: Task<Data>
947 ): void {
948 const timestamp = performance.now()
949 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
950 updateMeasurementStatistics(
951 workerUsage.waitTime,
952 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
953 taskWaitTime,
954 workerUsage.tasks.executed
955 )
956 }
957
958 private updateEluWorkerUsage (
959 workerUsage: WorkerUsage,
960 message: MessageValue<Response>
961 ): void {
962 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
963 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
964 updateMeasurementStatistics(
965 workerUsage.elu.active,
966 eluTaskStatisticsRequirements,
967 message.taskPerformance?.elu?.active ?? 0,
968 workerUsage.tasks.executed
969 )
970 updateMeasurementStatistics(
971 workerUsage.elu.idle,
972 eluTaskStatisticsRequirements,
973 message.taskPerformance?.elu?.idle ?? 0,
974 workerUsage.tasks.executed
975 )
976 if (eluTaskStatisticsRequirements.aggregate) {
977 if (message.taskPerformance?.elu != null) {
978 if (workerUsage.elu.utilization != null) {
979 workerUsage.elu.utilization =
980 (workerUsage.elu.utilization +
981 message.taskPerformance.elu.utilization) /
982 2
983 } else {
984 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
985 }
986 }
987 }
988 }
989
990 /**
991 * Chooses a worker node for the next task.
992 *
993 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
994 *
995 * @returns The chosen worker node key
996 */
997 private chooseWorkerNode (): number {
998 if (this.shallCreateDynamicWorker()) {
999 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1000 if (
1001 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1002 ) {
1003 return workerNodeKey
1004 }
1005 }
1006 return this.workerChoiceStrategyContext.execute()
1007 }
1008
1009 /**
1010 * Conditions for dynamic worker creation.
1011 *
1012 * @returns Whether to create a dynamic worker or not.
1013 */
1014 private shallCreateDynamicWorker (): boolean {
1015 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1016 }
1017
1018 /**
1019 * Sends a message to worker given its worker node key.
1020 *
1021 * @param workerNodeKey - The worker node key.
1022 * @param message - The message.
1023 * @param transferList - The optional array of transferable objects.
1024 */
1025 protected abstract sendToWorker (
1026 workerNodeKey: number,
1027 message: MessageValue<Data>,
1028 transferList?: TransferListItem[]
1029 ): void
1030
1031 /**
1032 * Creates a new worker.
1033 *
1034 * @returns Newly created worker.
1035 */
1036 protected abstract createWorker (): Worker
1037
1038 /**
1039 * Creates a new, completely set up worker node.
1040 *
1041 * @returns New, completely set up worker node key.
1042 */
1043 protected createAndSetupWorkerNode (): number {
1044 const worker = this.createWorker()
1045
1046 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1047 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1048 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1049 worker.on('error', (error) => {
1050 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1051 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1052 workerInfo.ready = false
1053 this.workerNodes[workerNodeKey].closeChannel()
1054 this.emitter?.emit(PoolEvents.error, error)
1055 if (
1056 this.opts.restartWorkerOnError === true &&
1057 !this.starting &&
1058 this.started
1059 ) {
1060 if (workerInfo.dynamic) {
1061 this.createAndSetupDynamicWorkerNode()
1062 } else {
1063 this.createAndSetupWorkerNode()
1064 }
1065 }
1066 if (this.opts.enableTasksQueue === true) {
1067 this.redistributeQueuedTasks(workerNodeKey)
1068 }
1069 })
1070 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1071 worker.once('exit', () => {
1072 this.removeWorkerNode(worker)
1073 })
1074
1075 const workerNodeKey = this.addWorkerNode(worker)
1076
1077 this.afterWorkerNodeSetup(workerNodeKey)
1078
1079 return workerNodeKey
1080 }
1081
1082 /**
1083 * Creates a new, completely set up dynamic worker node.
1084 *
1085 * @returns New, completely set up dynamic worker node key.
1086 */
1087 protected createAndSetupDynamicWorkerNode (): number {
1088 const workerNodeKey = this.createAndSetupWorkerNode()
1089 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1090 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1091 message.workerId
1092 )
1093 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1094 // Kill message received from worker
1095 if (
1096 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1097 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1098 ((this.opts.enableTasksQueue === false &&
1099 workerUsage.tasks.executing === 0) ||
1100 (this.opts.enableTasksQueue === true &&
1101 workerUsage.tasks.executing === 0 &&
1102 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1103 ) {
1104 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1105 this.emitter?.emit(PoolEvents.error, error)
1106 })
1107 }
1108 })
1109 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1110 this.sendToWorker(workerNodeKey, {
1111 checkActive: true,
1112 workerId: workerInfo.id as number
1113 })
1114 workerInfo.dynamic = true
1115 if (
1116 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1117 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1118 ) {
1119 workerInfo.ready = true
1120 }
1121 this.checkAndEmitDynamicWorkerCreationEvents()
1122 return workerNodeKey
1123 }
1124
1125 /**
1126 * Registers a listener callback on the worker given its worker node key.
1127 *
1128 * @param workerNodeKey - The worker node key.
1129 * @param listener - The message listener callback.
1130 */
1131 protected abstract registerWorkerMessageListener<
1132 Message extends Data | Response
1133 >(
1134 workerNodeKey: number,
1135 listener: (message: MessageValue<Message>) => void
1136 ): void
1137
1138 /**
1139 * Method hooked up after a worker node has been newly created.
1140 * Can be overridden.
1141 *
1142 * @param workerNodeKey - The newly created worker node key.
1143 */
1144 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1145 // Listen to worker messages.
1146 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1147 // Send the startup message to worker.
1148 this.sendStartupMessageToWorker(workerNodeKey)
1149 // Send the statistics message to worker.
1150 this.sendStatisticsMessageToWorker(workerNodeKey)
1151 if (this.opts.enableTasksQueue === true) {
1152 this.workerNodes[workerNodeKey].onBackPressure =
1153 this.tasksStealingOnBackPressure.bind(this)
1154 }
1155 }
1156
1157 /**
1158 * Sends the startup message to worker given its worker node key.
1159 *
1160 * @param workerNodeKey - The worker node key.
1161 */
1162 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1163
1164 /**
1165 * Sends the statistics message to worker given its worker node key.
1166 *
1167 * @param workerNodeKey - The worker node key.
1168 */
1169 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1170 this.sendToWorker(workerNodeKey, {
1171 statistics: {
1172 runTime:
1173 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1174 .runTime.aggregate,
1175 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1176 .elu.aggregate
1177 },
1178 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1179 })
1180 }
1181
1182 private redistributeQueuedTasks (workerNodeKey: number): void {
1183 while (this.tasksQueueSize(workerNodeKey) > 0) {
1184 let targetWorkerNodeKey: number = workerNodeKey
1185 let minQueuedTasks = Infinity
1186 let executeTask = false
1187 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1188 if (
1189 this.workerNodes[workerNodeId].usage.tasks.executing <
1190 (this.opts.tasksQueueOptions?.concurrency as number)
1191 ) {
1192 executeTask = true
1193 }
1194 if (
1195 workerNodeId !== workerNodeKey &&
1196 workerNode.info.ready &&
1197 workerNode.usage.tasks.queued === 0
1198 ) {
1199 targetWorkerNodeKey = workerNodeId
1200 break
1201 }
1202 if (
1203 workerNodeId !== workerNodeKey &&
1204 workerNode.info.ready &&
1205 workerNode.usage.tasks.queued < minQueuedTasks
1206 ) {
1207 minQueuedTasks = workerNode.usage.tasks.queued
1208 targetWorkerNodeKey = workerNodeId
1209 }
1210 }
1211 if (executeTask) {
1212 this.executeTask(
1213 targetWorkerNodeKey,
1214 this.dequeueTask(workerNodeKey) as Task<Data>
1215 )
1216 } else {
1217 this.enqueueTask(
1218 targetWorkerNodeKey,
1219 this.dequeueTask(workerNodeKey) as Task<Data>
1220 )
1221 }
1222 }
1223 }
1224
1225 private tasksStealingOnBackPressure (workerId: number): void {
1226 const sourceWorkerNode =
1227 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1228 const workerNodes = this.workerNodes
1229 .filter((workerNode) => workerNode.info.id !== workerId)
1230 .sort(
1231 (workerNodeA, workerNodeB) =>
1232 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1233 )
1234 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1235 if (
1236 workerNode.info.ready &&
1237 sourceWorkerNode.usage.tasks.queued > 0 &&
1238 !workerNode.hasBackPressure() &&
1239 workerNode.usage.tasks.executing <
1240 (this.opts.tasksQueueOptions?.concurrency as number)
1241 ) {
1242 this.executeTask(
1243 workerNodeKey,
1244 sourceWorkerNode.popTask() as Task<Data>
1245 )
1246 } else if (
1247 workerNode.info.ready &&
1248 sourceWorkerNode.usage.tasks.queued > 0 &&
1249 !workerNode.hasBackPressure() &&
1250 workerNode.usage.tasks.executing >=
1251 (this.opts.tasksQueueOptions?.concurrency as number)
1252 ) {
1253 this.enqueueTask(
1254 workerNodeKey,
1255 sourceWorkerNode.popTask() as Task<Data>
1256 )
1257 }
1258 }
1259 }
1260
1261 /**
1262 * This method is the listener registered for each worker message.
1263 *
1264 * @returns The listener function to execute when a message is received from a worker.
1265 */
1266 protected workerListener (): (message: MessageValue<Response>) => void {
1267 return (message) => {
1268 this.checkMessageWorkerId(message)
1269 if (message.ready != null && message.taskFunctions != null) {
1270 // Worker ready response received from worker
1271 this.handleWorkerReadyResponse(message)
1272 } else if (message.taskId != null) {
1273 // Task execution response received from worker
1274 this.handleTaskExecutionResponse(message)
1275 } else if (message.taskFunctions != null) {
1276 // Task functions message received from worker
1277 (
1278 this.getWorkerInfo(
1279 this.getWorkerNodeKeyByWorkerId(message.workerId)
1280 ) as WorkerInfo
1281 ).taskFunctions = message.taskFunctions
1282 }
1283 }
1284 }
1285
1286 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1287 if (message.ready === false) {
1288 throw new Error(`Worker ${message.workerId} failed to initialize`)
1289 }
1290 const workerInfo = this.getWorkerInfo(
1291 this.getWorkerNodeKeyByWorkerId(message.workerId)
1292 ) as WorkerInfo
1293 workerInfo.ready = message.ready as boolean
1294 workerInfo.taskFunctions = message.taskFunctions
1295 if (this.emitter != null && this.ready) {
1296 this.emitter.emit(PoolEvents.ready, this.info)
1297 }
1298 }
1299
1300 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1301 const { taskId, taskError, data } = message
1302 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1303 if (promiseResponse != null) {
1304 if (taskError != null) {
1305 this.emitter?.emit(PoolEvents.taskError, taskError)
1306 promiseResponse.reject(taskError.message)
1307 } else {
1308 promiseResponse.resolve(data as Response)
1309 }
1310 const workerNodeKey = promiseResponse.workerNodeKey
1311 this.afterTaskExecutionHook(workerNodeKey, message)
1312 this.promiseResponseMap.delete(taskId as string)
1313 if (
1314 this.opts.enableTasksQueue === true &&
1315 this.tasksQueueSize(workerNodeKey) > 0 &&
1316 this.workerNodes[workerNodeKey].usage.tasks.executing <
1317 (this.opts.tasksQueueOptions?.concurrency as number)
1318 ) {
1319 this.executeTask(
1320 workerNodeKey,
1321 this.dequeueTask(workerNodeKey) as Task<Data>
1322 )
1323 }
1324 this.workerChoiceStrategyContext.update(workerNodeKey)
1325 }
1326 }
1327
1328 private checkAndEmitTaskExecutionEvents (): void {
1329 if (this.busy) {
1330 this.emitter?.emit(PoolEvents.busy, this.info)
1331 }
1332 }
1333
1334 private checkAndEmitTaskQueuingEvents (): void {
1335 if (this.hasBackPressure()) {
1336 this.emitter?.emit(PoolEvents.backPressure, this.info)
1337 }
1338 }
1339
1340 private checkAndEmitDynamicWorkerCreationEvents (): void {
1341 if (this.type === PoolTypes.dynamic) {
1342 if (this.full) {
1343 this.emitter?.emit(PoolEvents.full, this.info)
1344 }
1345 }
1346 }
1347
1348 /**
1349 * Gets the worker information given its worker node key.
1350 *
1351 * @param workerNodeKey - The worker node key.
1352 * @returns The worker information.
1353 */
1354 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1355 return this.workerNodes[workerNodeKey]?.info
1356 }
1357
1358 /**
1359 * Adds the given worker in the pool worker nodes.
1360 *
1361 * @param worker - The worker.
1362 * @returns The added worker node key.
1363 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1364 */
1365 private addWorkerNode (worker: Worker): number {
1366 const workerNode = new WorkerNode<Worker, Data>(
1367 worker,
1368 this.worker,
1369 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
1370 )
1371 // Flag the worker node as ready at pool startup.
1372 if (this.starting) {
1373 workerNode.info.ready = true
1374 }
1375 this.workerNodes.push(workerNode)
1376 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1377 if (workerNodeKey === -1) {
1378 throw new Error('Worker node added not found')
1379 }
1380 return workerNodeKey
1381 }
1382
1383 /**
1384 * Removes the given worker from the pool worker nodes.
1385 *
1386 * @param worker - The worker.
1387 */
1388 private removeWorkerNode (worker: Worker): void {
1389 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1390 if (workerNodeKey !== -1) {
1391 this.workerNodes.splice(workerNodeKey, 1)
1392 this.workerChoiceStrategyContext.remove(workerNodeKey)
1393 }
1394 }
1395
1396 /** @inheritDoc */
1397 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1398 return (
1399 this.opts.enableTasksQueue === true &&
1400 this.workerNodes[workerNodeKey].hasBackPressure()
1401 )
1402 }
1403
1404 private hasBackPressure (): boolean {
1405 return (
1406 this.opts.enableTasksQueue === true &&
1407 this.workerNodes.findIndex(
1408 (workerNode) => !workerNode.hasBackPressure()
1409 ) === -1
1410 )
1411 }
1412
1413 /**
1414 * Executes the given task on the worker given its worker node key.
1415 *
1416 * @param workerNodeKey - The worker node key.
1417 * @param task - The task to execute.
1418 */
1419 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1420 this.beforeTaskExecutionHook(workerNodeKey, task)
1421 this.sendToWorker(workerNodeKey, task, task.transferList)
1422 this.checkAndEmitTaskExecutionEvents()
1423 }
1424
1425 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1426 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1427 this.checkAndEmitTaskQueuingEvents()
1428 return tasksQueueSize
1429 }
1430
1431 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1432 return this.workerNodes[workerNodeKey].dequeueTask()
1433 }
1434
1435 private tasksQueueSize (workerNodeKey: number): number {
1436 return this.workerNodes[workerNodeKey].tasksQueueSize()
1437 }
1438
1439 protected flushTasksQueue (workerNodeKey: number): void {
1440 while (this.tasksQueueSize(workerNodeKey) > 0) {
1441 this.executeTask(
1442 workerNodeKey,
1443 this.dequeueTask(workerNodeKey) as Task<Data>
1444 )
1445 }
1446 this.workerNodes[workerNodeKey].clearTasksQueue()
1447 }
1448
1449 private flushTasksQueues (): void {
1450 for (const [workerNodeKey] of this.workerNodes.entries()) {
1451 this.flushTasksQueue(workerNodeKey)
1452 }
1453 }
1454 }