refactor: cleanup tasks recheduling code
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * Whether the pool is started or not.
98 */
99 private started: boolean
100 /**
101 * The start timestamp of the pool.
102 */
103 private readonly startTimestamp
104
105 /**
106 * Constructs a new poolifier pool.
107 *
108 * @param numberOfWorkers - Number of workers that this pool should manage.
109 * @param filePath - Path to the worker file.
110 * @param opts - Options for the pool.
111 */
112 public constructor (
113 protected readonly numberOfWorkers: number,
114 protected readonly filePath: string,
115 protected readonly opts: PoolOptions<Worker>
116 ) {
117 if (!this.isMain()) {
118 throw new Error(
119 'Cannot start a pool from a worker with the same type as the pool'
120 )
121 }
122 this.checkNumberOfWorkers(this.numberOfWorkers)
123 this.checkFilePath(this.filePath)
124 this.checkPoolOptions(this.opts)
125
126 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
127 this.executeTask = this.executeTask.bind(this)
128 this.enqueueTask = this.enqueueTask.bind(this)
129
130 if (this.opts.enableEvents === true) {
131 this.emitter = new PoolEmitter()
132 }
133 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
134 Worker,
135 Data,
136 Response
137 >(
138 this,
139 this.opts.workerChoiceStrategy,
140 this.opts.workerChoiceStrategyOptions
141 )
142
143 this.setupHook()
144
145 this.starting = true
146 this.startPool()
147 this.starting = false
148 this.started = true
149
150 this.startTimestamp = performance.now()
151 }
152
153 private checkFilePath (filePath: string): void {
154 if (
155 filePath == null ||
156 typeof filePath !== 'string' ||
157 (typeof filePath === 'string' && filePath.trim().length === 0)
158 ) {
159 throw new Error('Please specify a file with a worker implementation')
160 }
161 if (!existsSync(filePath)) {
162 throw new Error(`Cannot find the worker file '${filePath}'`)
163 }
164 }
165
166 private checkNumberOfWorkers (numberOfWorkers: number): void {
167 if (numberOfWorkers == null) {
168 throw new Error(
169 'Cannot instantiate a pool without specifying the number of workers'
170 )
171 } else if (!Number.isSafeInteger(numberOfWorkers)) {
172 throw new TypeError(
173 'Cannot instantiate a pool with a non safe integer number of workers'
174 )
175 } else if (numberOfWorkers < 0) {
176 throw new RangeError(
177 'Cannot instantiate a pool with a negative number of workers'
178 )
179 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
180 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
181 }
182 }
183
184 protected checkDynamicPoolSize (min: number, max: number): void {
185 if (this.type === PoolTypes.dynamic) {
186 if (max == null) {
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
189 )
190 } else if (!Number.isSafeInteger(max)) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
193 )
194 } else if (min > max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 )
198 } else if (max === 0) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
201 )
202 } else if (min === max) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
205 )
206 }
207 }
208 }
209
210 private checkPoolOptions (opts: PoolOptions<Worker>): void {
211 if (isPlainObject(opts)) {
212 this.opts.workerChoiceStrategy =
213 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
215 this.opts.workerChoiceStrategyOptions = {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
217 ...opts.workerChoiceStrategyOptions
218 }
219 this.checkValidWorkerChoiceStrategyOptions(
220 this.opts.workerChoiceStrategyOptions
221 )
222 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
223 this.opts.enableEvents = opts.enableEvents ?? true
224 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
225 if (this.opts.enableTasksQueue) {
226 this.checkValidTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
235 }
236 }
237
238 private checkValidWorkerChoiceStrategy (
239 workerChoiceStrategy: WorkerChoiceStrategy
240 ): void {
241 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
242 throw new Error(
243 `Invalid worker choice strategy '${workerChoiceStrategy}'`
244 )
245 }
246 }
247
248 private checkValidWorkerChoiceStrategyOptions (
249 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
250 ): void {
251 if (!isPlainObject(workerChoiceStrategyOptions)) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: must be a plain object'
254 )
255 }
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
259 ) {
260 throw new TypeError(
261 'Invalid worker choice strategy options: choice retries must be an integer'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions.choiceRetries != null &&
266 workerChoiceStrategyOptions.choiceRetries <= 0
267 ) {
268 throw new RangeError(
269 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
270 )
271 }
272 if (
273 workerChoiceStrategyOptions.weights != null &&
274 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
275 ) {
276 throw new Error(
277 'Invalid worker choice strategy options: must have a weight for each worker node'
278 )
279 }
280 if (
281 workerChoiceStrategyOptions.measurement != null &&
282 !Object.values(Measurements).includes(
283 workerChoiceStrategyOptions.measurement
284 )
285 ) {
286 throw new Error(
287 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
288 )
289 }
290 }
291
292 private checkValidTasksQueueOptions (
293 tasksQueueOptions: TasksQueueOptions
294 ): void {
295 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
296 throw new TypeError('Invalid tasks queue options: must be a plain object')
297 }
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 !Number.isSafeInteger(tasksQueueOptions.concurrency)
301 ) {
302 throw new TypeError(
303 'Invalid worker node tasks concurrency: must be an integer'
304 )
305 }
306 if (
307 tasksQueueOptions?.concurrency != null &&
308 tasksQueueOptions.concurrency <= 0
309 ) {
310 throw new RangeError(
311 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
312 )
313 }
314 if (
315 tasksQueueOptions?.queueMaxSize != null &&
316 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
317 ) {
318 throw new TypeError(
319 'Invalid worker node tasks queue max size: must be an integer'
320 )
321 }
322 if (
323 tasksQueueOptions?.queueMaxSize != null &&
324 tasksQueueOptions.queueMaxSize <= 0
325 ) {
326 throw new RangeError(
327 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
328 )
329 }
330 }
331
332 private startPool (): void {
333 while (
334 this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
337 0
338 ) < this.numberOfWorkers
339 ) {
340 this.createAndSetupWorkerNode()
341 }
342 }
343
344 /** @inheritDoc */
345 public get info (): PoolInfo {
346 return {
347 version,
348 type: this.type,
349 worker: this.worker,
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
352 minSize: this.minSize,
353 maxSize: this.maxSize,
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 workerNode.usage.tasks.executing === 0
362 ? accumulator + 1
363 : accumulator,
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
369 0
370 ),
371 executedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.executed,
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + workerNode.usage.tasks.executing,
379 0
380 ),
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
398 failedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + workerNode.usage.tasks.failed,
401 0
402 ),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .runTime.aggregate && {
405 runTime: {
406 minimum: round(
407 Math.min(
408 ...this.workerNodes.map(
409 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
410 )
411 )
412 ),
413 maximum: round(
414 Math.max(
415 ...this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
417 )
418 )
419 ),
420 average: round(
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
424 0
425 ) /
426 this.workerNodes.reduce(
427 (accumulator, workerNode) =>
428 accumulator + (workerNode.usage.tasks?.executed ?? 0),
429 0
430 )
431 ),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.median && {
434 median: round(
435 median(
436 this.workerNodes.map(
437 (workerNode) => workerNode.usage.runTime?.median ?? 0
438 )
439 )
440 )
441 })
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
447 minimum: round(
448 Math.min(
449 ...this.workerNodes.map(
450 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
451 )
452 )
453 ),
454 maximum: round(
455 Math.max(
456 ...this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
458 )
459 )
460 ),
461 average: round(
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
465 0
466 ) /
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.tasks?.executed ?? 0),
470 0
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
477 this.workerNodes.map(
478 (workerNode) => workerNode.usage.waitTime?.median ?? 0
479 )
480 )
481 )
482 })
483 }
484 })
485 }
486 }
487
488 /**
489 * The pool readiness boolean status.
490 */
491 private get ready (): boolean {
492 return (
493 this.workerNodes.reduce(
494 (accumulator, workerNode) =>
495 !workerNode.info.dynamic && workerNode.info.ready
496 ? accumulator + 1
497 : accumulator,
498 0
499 ) >= this.minSize
500 )
501 }
502
503 /**
504 * The approximate pool utilization.
505 *
506 * @returns The pool utilization.
507 */
508 private get utilization (): number {
509 const poolTimeCapacity =
510 (performance.now() - this.startTimestamp) * this.maxSize
511 const totalTasksRunTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
513 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
514 0
515 )
516 const totalTasksWaitTime = this.workerNodes.reduce(
517 (accumulator, workerNode) =>
518 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
519 0
520 )
521 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
522 }
523
524 /**
525 * The pool type.
526 *
527 * If it is `'dynamic'`, it provides the `max` property.
528 */
529 protected abstract get type (): PoolType
530
531 /**
532 * The worker type.
533 */
534 protected abstract get worker (): WorkerType
535
536 /**
537 * The pool minimum size.
538 */
539 protected get minSize (): number {
540 return this.numberOfWorkers
541 }
542
543 /**
544 * The pool maximum size.
545 */
546 protected get maxSize (): number {
547 return this.max ?? this.numberOfWorkers
548 }
549
550 /**
551 * Checks if the worker id sent in the received message from a worker is valid.
552 *
553 * @param message - The received message.
554 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
555 */
556 private checkMessageWorkerId (message: MessageValue<Response>): void {
557 if (message.workerId == null) {
558 throw new Error('Worker message received without worker id')
559 } else if (
560 message.workerId != null &&
561 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
562 ) {
563 throw new Error(
564 `Worker message received from unknown worker '${message.workerId}'`
565 )
566 }
567 }
568
569 /**
570 * Gets the given worker its worker node key.
571 *
572 * @param worker - The worker.
573 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
574 */
575 private getWorkerNodeKeyByWorker (worker: Worker): number {
576 return this.workerNodes.findIndex(
577 (workerNode) => workerNode.worker === worker
578 )
579 }
580
581 /**
582 * Gets the worker node key given its worker id.
583 *
584 * @param workerId - The worker id.
585 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
586 */
587 private getWorkerNodeKeyByWorkerId (workerId: number): number {
588 return this.workerNodes.findIndex(
589 (workerNode) => workerNode.info.id === workerId
590 )
591 }
592
593 /** @inheritDoc */
594 public setWorkerChoiceStrategy (
595 workerChoiceStrategy: WorkerChoiceStrategy,
596 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
597 ): void {
598 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
599 this.opts.workerChoiceStrategy = workerChoiceStrategy
600 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
601 this.opts.workerChoiceStrategy
602 )
603 if (workerChoiceStrategyOptions != null) {
604 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
605 }
606 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
607 workerNode.resetUsage()
608 this.sendStatisticsMessageToWorker(workerNodeKey)
609 }
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategyOptions (
614 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
615 ): void {
616 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
617 this.opts.workerChoiceStrategyOptions = {
618 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
619 ...workerChoiceStrategyOptions
620 }
621 this.workerChoiceStrategyContext.setOptions(
622 this.opts.workerChoiceStrategyOptions
623 )
624 }
625
626 /** @inheritDoc */
627 public enableTasksQueue (
628 enable: boolean,
629 tasksQueueOptions?: TasksQueueOptions
630 ): void {
631 if (this.opts.enableTasksQueue === true && !enable) {
632 this.flushTasksQueues()
633 }
634 this.opts.enableTasksQueue = enable
635 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
636 }
637
638 /** @inheritDoc */
639 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
640 if (this.opts.enableTasksQueue === true) {
641 this.checkValidTasksQueueOptions(tasksQueueOptions)
642 this.opts.tasksQueueOptions =
643 this.buildTasksQueueOptions(tasksQueueOptions)
644 this.setTasksQueueMaxSize(
645 this.opts.tasksQueueOptions.queueMaxSize as number
646 )
647 } else if (this.opts.tasksQueueOptions != null) {
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
652 private setTasksQueueMaxSize (queueMaxSize: number): void {
653 for (const workerNode of this.workerNodes) {
654 workerNode.tasksQueueBackPressureSize = queueMaxSize
655 }
656 }
657
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
662 ...{
663 queueMaxSize: Math.pow(this.maxSize, 2),
664 concurrency: 1
665 },
666 ...tasksQueueOptions
667 }
668 }
669
670 /**
671 * Whether the pool is full or not.
672 *
673 * The pool filling boolean status.
674 */
675 protected get full (): boolean {
676 return this.workerNodes.length >= this.maxSize
677 }
678
679 /**
680 * Whether the pool is busy or not.
681 *
682 * The pool busyness boolean status.
683 */
684 protected abstract get busy (): boolean
685
686 /**
687 * Whether worker nodes are executing concurrently their tasks quota or not.
688 *
689 * @returns Worker nodes busyness boolean status.
690 */
691 protected internalBusy (): boolean {
692 if (this.opts.enableTasksQueue === true) {
693 return (
694 this.workerNodes.findIndex(
695 (workerNode) =>
696 workerNode.info.ready &&
697 workerNode.usage.tasks.executing <
698 (this.opts.tasksQueueOptions?.concurrency as number)
699 ) === -1
700 )
701 } else {
702 return (
703 this.workerNodes.findIndex(
704 (workerNode) =>
705 workerNode.info.ready && workerNode.usage.tasks.executing === 0
706 ) === -1
707 )
708 }
709 }
710
711 /** @inheritDoc */
712 public listTaskFunctions (): string[] {
713 for (const workerNode of this.workerNodes) {
714 if (
715 Array.isArray(workerNode.info.taskFunctions) &&
716 workerNode.info.taskFunctions.length > 0
717 ) {
718 return workerNode.info.taskFunctions
719 }
720 }
721 return []
722 }
723
724 /** @inheritDoc */
725 public async execute (
726 data?: Data,
727 name?: string,
728 transferList?: TransferListItem[]
729 ): Promise<Response> {
730 return await new Promise<Response>((resolve, reject) => {
731 if (!this.started) {
732 reject(new Error('Cannot execute a task on destroyed pool'))
733 }
734 if (name != null && typeof name !== 'string') {
735 reject(new TypeError('name argument must be a string'))
736 }
737 if (
738 name != null &&
739 typeof name === 'string' &&
740 name.trim().length === 0
741 ) {
742 reject(new TypeError('name argument must not be an empty string'))
743 }
744 if (transferList != null && !Array.isArray(transferList)) {
745 reject(new TypeError('transferList argument must be an array'))
746 }
747 const timestamp = performance.now()
748 const workerNodeKey = this.chooseWorkerNode()
749 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
750 if (
751 name != null &&
752 Array.isArray(workerInfo.taskFunctions) &&
753 !workerInfo.taskFunctions.includes(name)
754 ) {
755 reject(
756 new Error(`Task function '${name}' is not registered in the pool`)
757 )
758 }
759 const task: Task<Data> = {
760 name: name ?? DEFAULT_TASK_NAME,
761 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
762 data: data ?? ({} as Data),
763 transferList,
764 timestamp,
765 workerId: workerInfo.id as number,
766 taskId: randomUUID()
767 }
768 this.promiseResponseMap.set(task.taskId as string, {
769 resolve,
770 reject,
771 workerNodeKey
772 })
773 if (
774 this.opts.enableTasksQueue === false ||
775 (this.opts.enableTasksQueue === true &&
776 this.workerNodes[workerNodeKey].usage.tasks.executing <
777 (this.opts.tasksQueueOptions?.concurrency as number))
778 ) {
779 this.executeTask(workerNodeKey, task)
780 } else {
781 this.enqueueTask(workerNodeKey, task)
782 }
783 })
784 }
785
786 /** @inheritDoc */
787 public async destroy (): Promise<void> {
788 await Promise.all(
789 this.workerNodes.map(async (_, workerNodeKey) => {
790 await this.destroyWorkerNode(workerNodeKey)
791 })
792 )
793 this.emitter?.emit(PoolEvents.destroy, this.info)
794 this.started = false
795 }
796
797 protected async sendKillMessageToWorker (
798 workerNodeKey: number,
799 workerId: number
800 ): Promise<void> {
801 await new Promise<void>((resolve, reject) => {
802 this.registerWorkerMessageListener(workerNodeKey, (message) => {
803 if (message.kill === 'success') {
804 resolve()
805 } else if (message.kill === 'failure') {
806 reject(new Error(`Worker ${workerId} kill message handling failed`))
807 }
808 })
809 this.sendToWorker(workerNodeKey, { kill: true, workerId })
810 })
811 }
812
813 /**
814 * Terminates the worker node given its worker node key.
815 *
816 * @param workerNodeKey - The worker node key.
817 */
818 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
819
820 /**
821 * Setup hook to execute code before worker nodes are created in the abstract constructor.
822 * Can be overridden.
823 *
824 * @virtual
825 */
826 protected setupHook (): void {
827 // Intentionally empty
828 }
829
830 /**
831 * Should return whether the worker is the main worker or not.
832 */
833 protected abstract isMain (): boolean
834
835 /**
836 * Hook executed before the worker task execution.
837 * Can be overridden.
838 *
839 * @param workerNodeKey - The worker node key.
840 * @param task - The task to execute.
841 */
842 protected beforeTaskExecutionHook (
843 workerNodeKey: number,
844 task: Task<Data>
845 ): void {
846 if (this.workerNodes[workerNodeKey]?.usage != null) {
847 const workerUsage = this.workerNodes[workerNodeKey].usage
848 ++workerUsage.tasks.executing
849 this.updateWaitTimeWorkerUsage(workerUsage, task)
850 }
851 if (
852 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
853 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
854 task.name as string
855 ) != null
856 ) {
857 const taskFunctionWorkerUsage = this.workerNodes[
858 workerNodeKey
859 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
860 ++taskFunctionWorkerUsage.tasks.executing
861 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
862 }
863 }
864
865 /**
866 * Hook executed after the worker task execution.
867 * Can be overridden.
868 *
869 * @param workerNodeKey - The worker node key.
870 * @param message - The received message.
871 */
872 protected afterTaskExecutionHook (
873 workerNodeKey: number,
874 message: MessageValue<Response>
875 ): void {
876 if (this.workerNodes[workerNodeKey]?.usage != null) {
877 const workerUsage = this.workerNodes[workerNodeKey].usage
878 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
879 this.updateRunTimeWorkerUsage(workerUsage, message)
880 this.updateEluWorkerUsage(workerUsage, message)
881 }
882 if (
883 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
884 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
885 message.taskPerformance?.name as string
886 ) != null
887 ) {
888 const taskFunctionWorkerUsage = this.workerNodes[
889 workerNodeKey
890 ].getTaskFunctionWorkerUsage(
891 message.taskPerformance?.name as string
892 ) as WorkerUsage
893 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
894 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
895 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
896 }
897 }
898
899 /**
900 * Whether the worker node shall update its task function worker usage or not.
901 *
902 * @param workerNodeKey - The worker node key.
903 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
904 */
905 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
906 const workerInfo = this.getWorkerInfo(workerNodeKey)
907 return (
908 workerInfo != null &&
909 Array.isArray(workerInfo.taskFunctions) &&
910 workerInfo.taskFunctions.length > 2
911 )
912 }
913
914 private updateTaskStatisticsWorkerUsage (
915 workerUsage: WorkerUsage,
916 message: MessageValue<Response>
917 ): void {
918 const workerTaskStatistics = workerUsage.tasks
919 if (
920 workerTaskStatistics.executing != null &&
921 workerTaskStatistics.executing > 0
922 ) {
923 --workerTaskStatistics.executing
924 }
925 if (message.taskError == null) {
926 ++workerTaskStatistics.executed
927 } else {
928 ++workerTaskStatistics.failed
929 }
930 }
931
932 private updateRunTimeWorkerUsage (
933 workerUsage: WorkerUsage,
934 message: MessageValue<Response>
935 ): void {
936 updateMeasurementStatistics(
937 workerUsage.runTime,
938 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
939 message.taskPerformance?.runTime ?? 0,
940 workerUsage.tasks.executed
941 )
942 }
943
944 private updateWaitTimeWorkerUsage (
945 workerUsage: WorkerUsage,
946 task: Task<Data>
947 ): void {
948 const timestamp = performance.now()
949 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
950 updateMeasurementStatistics(
951 workerUsage.waitTime,
952 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
953 taskWaitTime,
954 workerUsage.tasks.executed
955 )
956 }
957
958 private updateEluWorkerUsage (
959 workerUsage: WorkerUsage,
960 message: MessageValue<Response>
961 ): void {
962 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
963 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
964 updateMeasurementStatistics(
965 workerUsage.elu.active,
966 eluTaskStatisticsRequirements,
967 message.taskPerformance?.elu?.active ?? 0,
968 workerUsage.tasks.executed
969 )
970 updateMeasurementStatistics(
971 workerUsage.elu.idle,
972 eluTaskStatisticsRequirements,
973 message.taskPerformance?.elu?.idle ?? 0,
974 workerUsage.tasks.executed
975 )
976 if (eluTaskStatisticsRequirements.aggregate) {
977 if (message.taskPerformance?.elu != null) {
978 if (workerUsage.elu.utilization != null) {
979 workerUsage.elu.utilization =
980 (workerUsage.elu.utilization +
981 message.taskPerformance.elu.utilization) /
982 2
983 } else {
984 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
985 }
986 }
987 }
988 }
989
990 /**
991 * Chooses a worker node for the next task.
992 *
993 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
994 *
995 * @returns The chosen worker node key
996 */
997 private chooseWorkerNode (): number {
998 if (this.shallCreateDynamicWorker()) {
999 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1000 if (
1001 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1002 ) {
1003 return workerNodeKey
1004 }
1005 }
1006 return this.workerChoiceStrategyContext.execute()
1007 }
1008
1009 /**
1010 * Conditions for dynamic worker creation.
1011 *
1012 * @returns Whether to create a dynamic worker or not.
1013 */
1014 private shallCreateDynamicWorker (): boolean {
1015 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1016 }
1017
1018 /**
1019 * Sends a message to worker given its worker node key.
1020 *
1021 * @param workerNodeKey - The worker node key.
1022 * @param message - The message.
1023 * @param transferList - The optional array of transferable objects.
1024 */
1025 protected abstract sendToWorker (
1026 workerNodeKey: number,
1027 message: MessageValue<Data>,
1028 transferList?: TransferListItem[]
1029 ): void
1030
1031 /**
1032 * Creates a new worker.
1033 *
1034 * @returns Newly created worker.
1035 */
1036 protected abstract createWorker (): Worker
1037
1038 /**
1039 * Creates a new, completely set up worker node.
1040 *
1041 * @returns New, completely set up worker node key.
1042 */
1043 protected createAndSetupWorkerNode (): number {
1044 const worker = this.createWorker()
1045
1046 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1047 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1048 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1049 worker.on('error', (error) => {
1050 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1051 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1052 workerInfo.ready = false
1053 this.workerNodes[workerNodeKey].closeChannel()
1054 this.emitter?.emit(PoolEvents.error, error)
1055 if (
1056 this.opts.restartWorkerOnError === true &&
1057 !this.starting &&
1058 this.started
1059 ) {
1060 if (workerInfo.dynamic) {
1061 this.createAndSetupDynamicWorkerNode()
1062 } else {
1063 this.createAndSetupWorkerNode()
1064 }
1065 }
1066 if (this.opts.enableTasksQueue === true) {
1067 this.redistributeQueuedTasks(workerNodeKey)
1068 }
1069 })
1070 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1071 worker.once('exit', () => {
1072 this.removeWorkerNode(worker)
1073 })
1074
1075 const workerNodeKey = this.addWorkerNode(worker)
1076
1077 this.afterWorkerNodeSetup(workerNodeKey)
1078
1079 return workerNodeKey
1080 }
1081
1082 /**
1083 * Creates a new, completely set up dynamic worker node.
1084 *
1085 * @returns New, completely set up dynamic worker node key.
1086 */
1087 protected createAndSetupDynamicWorkerNode (): number {
1088 const workerNodeKey = this.createAndSetupWorkerNode()
1089 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1090 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1091 message.workerId
1092 )
1093 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1094 // Kill message received from worker
1095 if (
1096 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1097 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1098 ((this.opts.enableTasksQueue === false &&
1099 workerUsage.tasks.executing === 0) ||
1100 (this.opts.enableTasksQueue === true &&
1101 workerUsage.tasks.executing === 0 &&
1102 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1103 ) {
1104 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1105 this.emitter?.emit(PoolEvents.error, error)
1106 })
1107 }
1108 })
1109 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1110 this.sendToWorker(workerNodeKey, {
1111 checkActive: true,
1112 workerId: workerInfo.id as number
1113 })
1114 workerInfo.dynamic = true
1115 if (
1116 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1117 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1118 ) {
1119 workerInfo.ready = true
1120 }
1121 this.checkAndEmitDynamicWorkerCreationEvents()
1122 return workerNodeKey
1123 }
1124
1125 /**
1126 * Registers a listener callback on the worker given its worker node key.
1127 *
1128 * @param workerNodeKey - The worker node key.
1129 * @param listener - The message listener callback.
1130 */
1131 protected abstract registerWorkerMessageListener<
1132 Message extends Data | Response
1133 >(
1134 workerNodeKey: number,
1135 listener: (message: MessageValue<Message>) => void
1136 ): void
1137
1138 /**
1139 * Method hooked up after a worker node has been newly created.
1140 * Can be overridden.
1141 *
1142 * @param workerNodeKey - The newly created worker node key.
1143 */
1144 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1145 // Listen to worker messages.
1146 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1147 // Send the startup message to worker.
1148 this.sendStartupMessageToWorker(workerNodeKey)
1149 // Send the statistics message to worker.
1150 this.sendStatisticsMessageToWorker(workerNodeKey)
1151 if (this.opts.enableTasksQueue === true) {
1152 this.workerNodes[workerNodeKey].onBackPressure =
1153 this.tasksStealingOnBackPressure.bind(this)
1154 }
1155 }
1156
1157 /**
1158 * Sends the startup message to worker given its worker node key.
1159 *
1160 * @param workerNodeKey - The worker node key.
1161 */
1162 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1163
1164 /**
1165 * Sends the statistics message to worker given its worker node key.
1166 *
1167 * @param workerNodeKey - The worker node key.
1168 */
1169 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1170 this.sendToWorker(workerNodeKey, {
1171 statistics: {
1172 runTime:
1173 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1174 .runTime.aggregate,
1175 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1176 .elu.aggregate
1177 },
1178 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1179 })
1180 }
1181
1182 private redistributeQueuedTasks (workerNodeKey: number): void {
1183 const workerNodes = this.workerNodes.filter(
1184 (_, workerNodeId) => workerNodeId !== workerNodeKey
1185 )
1186 while (this.tasksQueueSize(workerNodeKey) > 0) {
1187 let targetWorkerNodeKey: number = workerNodeKey
1188 let minQueuedTasks = Infinity
1189 let executeTask = false
1190 for (const [workerNodeId, workerNode] of workerNodes.entries()) {
1191 if (
1192 this.workerNodes[workerNodeId].usage.tasks.executing <
1193 (this.opts.tasksQueueOptions?.concurrency as number)
1194 ) {
1195 executeTask = true
1196 }
1197 if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
1198 targetWorkerNodeKey = workerNodeId
1199 break
1200 }
1201 if (
1202 workerNode.info.ready &&
1203 workerNode.usage.tasks.queued < minQueuedTasks
1204 ) {
1205 minQueuedTasks = workerNode.usage.tasks.queued
1206 targetWorkerNodeKey = workerNodeId
1207 }
1208 }
1209 if (executeTask) {
1210 this.executeTask(
1211 targetWorkerNodeKey,
1212 this.dequeueTask(workerNodeKey) as Task<Data>
1213 )
1214 } else {
1215 this.enqueueTask(
1216 targetWorkerNodeKey,
1217 this.dequeueTask(workerNodeKey) as Task<Data>
1218 )
1219 }
1220 }
1221 }
1222
1223 private tasksStealingOnBackPressure (workerId: number): void {
1224 const sourceWorkerNode =
1225 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1226 const workerNodes = this.workerNodes
1227 .filter((workerNode) => workerNode.info.id !== workerId)
1228 .sort(
1229 (workerNodeA, workerNodeB) =>
1230 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1231 )
1232 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1233 if (
1234 workerNode.info.ready &&
1235 sourceWorkerNode.usage.tasks.queued > 0 &&
1236 !workerNode.hasBackPressure()
1237 ) {
1238 if (
1239 workerNode.usage.tasks.executing <
1240 (this.opts.tasksQueueOptions?.concurrency as number)
1241 ) {
1242 this.executeTask(
1243 workerNodeKey,
1244 sourceWorkerNode.popTask() as Task<Data>
1245 )
1246 } else {
1247 this.enqueueTask(
1248 workerNodeKey,
1249 sourceWorkerNode.popTask() as Task<Data>
1250 )
1251 }
1252 }
1253 }
1254 }
1255
1256 /**
1257 * This method is the listener registered for each worker message.
1258 *
1259 * @returns The listener function to execute when a message is received from a worker.
1260 */
1261 protected workerListener (): (message: MessageValue<Response>) => void {
1262 return (message) => {
1263 this.checkMessageWorkerId(message)
1264 if (message.ready != null && message.taskFunctions != null) {
1265 // Worker ready response received from worker
1266 this.handleWorkerReadyResponse(message)
1267 } else if (message.taskId != null) {
1268 // Task execution response received from worker
1269 this.handleTaskExecutionResponse(message)
1270 } else if (message.taskFunctions != null) {
1271 // Task functions message received from worker
1272 (
1273 this.getWorkerInfo(
1274 this.getWorkerNodeKeyByWorkerId(message.workerId)
1275 ) as WorkerInfo
1276 ).taskFunctions = message.taskFunctions
1277 }
1278 }
1279 }
1280
1281 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1282 if (message.ready === false) {
1283 throw new Error(`Worker ${message.workerId} failed to initialize`)
1284 }
1285 const workerInfo = this.getWorkerInfo(
1286 this.getWorkerNodeKeyByWorkerId(message.workerId)
1287 ) as WorkerInfo
1288 workerInfo.ready = message.ready as boolean
1289 workerInfo.taskFunctions = message.taskFunctions
1290 if (this.emitter != null && this.ready) {
1291 this.emitter.emit(PoolEvents.ready, this.info)
1292 }
1293 }
1294
1295 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1296 const { taskId, taskError, data } = message
1297 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1298 if (promiseResponse != null) {
1299 if (taskError != null) {
1300 this.emitter?.emit(PoolEvents.taskError, taskError)
1301 promiseResponse.reject(taskError.message)
1302 } else {
1303 promiseResponse.resolve(data as Response)
1304 }
1305 const workerNodeKey = promiseResponse.workerNodeKey
1306 this.afterTaskExecutionHook(workerNodeKey, message)
1307 this.promiseResponseMap.delete(taskId as string)
1308 if (
1309 this.opts.enableTasksQueue === true &&
1310 this.tasksQueueSize(workerNodeKey) > 0 &&
1311 this.workerNodes[workerNodeKey].usage.tasks.executing <
1312 (this.opts.tasksQueueOptions?.concurrency as number)
1313 ) {
1314 this.executeTask(
1315 workerNodeKey,
1316 this.dequeueTask(workerNodeKey) as Task<Data>
1317 )
1318 }
1319 this.workerChoiceStrategyContext.update(workerNodeKey)
1320 }
1321 }
1322
1323 private checkAndEmitTaskExecutionEvents (): void {
1324 if (this.busy) {
1325 this.emitter?.emit(PoolEvents.busy, this.info)
1326 }
1327 }
1328
1329 private checkAndEmitTaskQueuingEvents (): void {
1330 if (this.hasBackPressure()) {
1331 this.emitter?.emit(PoolEvents.backPressure, this.info)
1332 }
1333 }
1334
1335 private checkAndEmitDynamicWorkerCreationEvents (): void {
1336 if (this.type === PoolTypes.dynamic) {
1337 if (this.full) {
1338 this.emitter?.emit(PoolEvents.full, this.info)
1339 }
1340 }
1341 }
1342
1343 /**
1344 * Gets the worker information given its worker node key.
1345 *
1346 * @param workerNodeKey - The worker node key.
1347 * @returns The worker information.
1348 */
1349 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1350 return this.workerNodes[workerNodeKey]?.info
1351 }
1352
1353 /**
1354 * Adds the given worker in the pool worker nodes.
1355 *
1356 * @param worker - The worker.
1357 * @returns The added worker node key.
1358 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1359 */
1360 private addWorkerNode (worker: Worker): number {
1361 const workerNode = new WorkerNode<Worker, Data>(
1362 worker,
1363 this.worker,
1364 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
1365 )
1366 // Flag the worker node as ready at pool startup.
1367 if (this.starting) {
1368 workerNode.info.ready = true
1369 }
1370 this.workerNodes.push(workerNode)
1371 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1372 if (workerNodeKey === -1) {
1373 throw new Error('Worker node added not found')
1374 }
1375 return workerNodeKey
1376 }
1377
1378 /**
1379 * Removes the given worker from the pool worker nodes.
1380 *
1381 * @param worker - The worker.
1382 */
1383 private removeWorkerNode (worker: Worker): void {
1384 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1385 if (workerNodeKey !== -1) {
1386 this.workerNodes.splice(workerNodeKey, 1)
1387 this.workerChoiceStrategyContext.remove(workerNodeKey)
1388 }
1389 }
1390
1391 /** @inheritDoc */
1392 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1393 return (
1394 this.opts.enableTasksQueue === true &&
1395 this.workerNodes[workerNodeKey].hasBackPressure()
1396 )
1397 }
1398
1399 private hasBackPressure (): boolean {
1400 return (
1401 this.opts.enableTasksQueue === true &&
1402 this.workerNodes.findIndex(
1403 (workerNode) => !workerNode.hasBackPressure()
1404 ) === -1
1405 )
1406 }
1407
1408 /**
1409 * Executes the given task on the worker given its worker node key.
1410 *
1411 * @param workerNodeKey - The worker node key.
1412 * @param task - The task to execute.
1413 */
1414 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1415 this.beforeTaskExecutionHook(workerNodeKey, task)
1416 this.sendToWorker(workerNodeKey, task, task.transferList)
1417 this.checkAndEmitTaskExecutionEvents()
1418 }
1419
1420 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1421 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1422 this.checkAndEmitTaskQueuingEvents()
1423 return tasksQueueSize
1424 }
1425
1426 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1427 return this.workerNodes[workerNodeKey].dequeueTask()
1428 }
1429
1430 private tasksQueueSize (workerNodeKey: number): number {
1431 return this.workerNodes[workerNodeKey].tasksQueueSize()
1432 }
1433
1434 protected flushTasksQueue (workerNodeKey: number): void {
1435 while (this.tasksQueueSize(workerNodeKey) > 0) {
1436 this.executeTask(
1437 workerNodeKey,
1438 this.dequeueTask(workerNodeKey) as Task<Data>
1439 )
1440 }
1441 this.workerNodes[workerNodeKey].clearTasksQueue()
1442 }
1443
1444 private flushTasksQueues (): void {
1445 for (const [workerNodeKey] of this.workerNodes.entries()) {
1446 this.flushTasksQueue(workerNodeKey)
1447 }
1448 }
1449 }