feat: add tasks stealing algorithm
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * The start timestamp of the pool.
98 */
99 private readonly startTimestamp
100
101 /**
102 * Constructs a new poolifier pool.
103 *
104 * @param numberOfWorkers - Number of workers that this pool should manage.
105 * @param filePath - Path to the worker file.
106 * @param opts - Options for the pool.
107 */
108 public constructor (
109 protected readonly numberOfWorkers: number,
110 protected readonly filePath: string,
111 protected readonly opts: PoolOptions<Worker>
112 ) {
113 if (!this.isMain()) {
114 throw new Error(
115 'Cannot start a pool from a worker with the same type as the pool'
116 )
117 }
118 this.checkNumberOfWorkers(this.numberOfWorkers)
119 this.checkFilePath(this.filePath)
120 this.checkPoolOptions(this.opts)
121
122 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
123 this.executeTask = this.executeTask.bind(this)
124 this.enqueueTask = this.enqueueTask.bind(this)
125
126 if (this.opts.enableEvents === true) {
127 this.emitter = new PoolEmitter()
128 }
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
138
139 this.setupHook()
140
141 this.starting = true
142 this.startPool()
143 this.starting = false
144
145 this.startTimestamp = performance.now()
146 }
147
148 private checkFilePath (filePath: string): void {
149 if (
150 filePath == null ||
151 typeof filePath !== 'string' ||
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
154 throw new Error('Please specify a file with a worker implementation')
155 }
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
159 }
160
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
167 throw new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 } else if (numberOfWorkers < 0) {
171 throw new RangeError(
172 'Cannot instantiate a pool with a negative number of workers'
173 )
174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
180 if (this.type === PoolTypes.dynamic) {
181 if (max == null) {
182 throw new TypeError(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 } else if (max === 0) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
202 }
203 }
204
205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
267 if (
268 workerChoiceStrategyOptions.weights != null &&
269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
285 }
286
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker node tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
305 throw new RangeError(
306 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
307 )
308 }
309 if (
310 tasksQueueOptions?.queueMaxSize != null &&
311 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
312 ) {
313 throw new TypeError(
314 'Invalid worker node tasks queue max size: must be an integer'
315 )
316 }
317 if (
318 tasksQueueOptions?.queueMaxSize != null &&
319 tasksQueueOptions.queueMaxSize <= 0
320 ) {
321 throw new RangeError(
322 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
323 )
324 }
325 }
326
327 private startPool (): void {
328 while (
329 this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
332 0
333 ) < this.numberOfWorkers
334 ) {
335 this.createAndSetupWorkerNode()
336 }
337 }
338
339 /** @inheritDoc */
340 public get info (): PoolInfo {
341 return {
342 version,
343 type: this.type,
344 worker: this.worker,
345 ready: this.ready,
346 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
347 minSize: this.minSize,
348 maxSize: this.maxSize,
349 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
350 .runTime.aggregate &&
351 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .waitTime.aggregate && { utilization: round(this.utilization) }),
353 workerNodes: this.workerNodes.length,
354 idleWorkerNodes: this.workerNodes.reduce(
355 (accumulator, workerNode) =>
356 workerNode.usage.tasks.executing === 0
357 ? accumulator + 1
358 : accumulator,
359 0
360 ),
361 busyWorkerNodes: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
364 0
365 ),
366 executedTasks: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + workerNode.usage.tasks.executed,
369 0
370 ),
371 executingTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + workerNode.usage.tasks.executing,
374 0
375 ),
376 ...(this.opts.enableTasksQueue === true && {
377 queuedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.queued,
380 0
381 )
382 }),
383 ...(this.opts.enableTasksQueue === true && {
384 maxQueuedTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
386 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
387 0
388 )
389 }),
390 ...(this.opts.enableTasksQueue === true && {
391 backPressure: this.hasBackPressure()
392 }),
393 failedTasks: this.workerNodes.reduce(
394 (accumulator, workerNode) =>
395 accumulator + workerNode.usage.tasks.failed,
396 0
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .runTime.aggregate && {
400 runTime: {
401 minimum: round(
402 Math.min(
403 ...this.workerNodes.map(
404 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
405 )
406 )
407 ),
408 maximum: round(
409 Math.max(
410 ...this.workerNodes.map(
411 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
412 )
413 )
414 ),
415 average: round(
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
419 0
420 ) /
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.tasks?.executed ?? 0),
424 0
425 )
426 ),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .runTime.median && {
429 median: round(
430 median(
431 this.workerNodes.map(
432 (workerNode) => workerNode.usage.runTime?.median ?? 0
433 )
434 )
435 )
436 })
437 }
438 }),
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .waitTime.aggregate && {
441 waitTime: {
442 minimum: round(
443 Math.min(
444 ...this.workerNodes.map(
445 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
446 )
447 )
448 ),
449 maximum: round(
450 Math.max(
451 ...this.workerNodes.map(
452 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
453 )
454 )
455 ),
456 average: round(
457 this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
460 0
461 ) /
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.tasks?.executed ?? 0),
465 0
466 )
467 ),
468 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
469 .waitTime.median && {
470 median: round(
471 median(
472 this.workerNodes.map(
473 (workerNode) => workerNode.usage.waitTime?.median ?? 0
474 )
475 )
476 )
477 })
478 }
479 })
480 }
481 }
482
483 /**
484 * The pool readiness boolean status.
485 */
486 private get ready (): boolean {
487 return (
488 this.workerNodes.reduce(
489 (accumulator, workerNode) =>
490 !workerNode.info.dynamic && workerNode.info.ready
491 ? accumulator + 1
492 : accumulator,
493 0
494 ) >= this.minSize
495 )
496 }
497
498 /**
499 * The approximate pool utilization.
500 *
501 * @returns The pool utilization.
502 */
503 private get utilization (): number {
504 const poolTimeCapacity =
505 (performance.now() - this.startTimestamp) * this.maxSize
506 const totalTasksRunTime = this.workerNodes.reduce(
507 (accumulator, workerNode) =>
508 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
509 0
510 )
511 const totalTasksWaitTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
513 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
514 0
515 )
516 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
517 }
518
519 /**
520 * The pool type.
521 *
522 * If it is `'dynamic'`, it provides the `max` property.
523 */
524 protected abstract get type (): PoolType
525
526 /**
527 * The worker type.
528 */
529 protected abstract get worker (): WorkerType
530
531 /**
532 * The pool minimum size.
533 */
534 protected get minSize (): number {
535 return this.numberOfWorkers
536 }
537
538 /**
539 * The pool maximum size.
540 */
541 protected get maxSize (): number {
542 return this.max ?? this.numberOfWorkers
543 }
544
545 /**
546 * Checks if the worker id sent in the received message from a worker is valid.
547 *
548 * @param message - The received message.
549 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
550 */
551 private checkMessageWorkerId (message: MessageValue<Response>): void {
552 if (message.workerId == null) {
553 throw new Error('Worker message received without worker id')
554 } else if (
555 message.workerId != null &&
556 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
557 ) {
558 throw new Error(
559 `Worker message received from unknown worker '${message.workerId}'`
560 )
561 }
562 }
563
564 /**
565 * Gets the given worker its worker node key.
566 *
567 * @param worker - The worker.
568 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
569 */
570 private getWorkerNodeKeyByWorker (worker: Worker): number {
571 return this.workerNodes.findIndex(
572 (workerNode) => workerNode.worker === worker
573 )
574 }
575
576 /**
577 * Gets the worker node key given its worker id.
578 *
579 * @param workerId - The worker id.
580 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
581 */
582 private getWorkerNodeKeyByWorkerId (workerId: number): number {
583 return this.workerNodes.findIndex(
584 (workerNode) => workerNode.info.id === workerId
585 )
586 }
587
588 /** @inheritDoc */
589 public setWorkerChoiceStrategy (
590 workerChoiceStrategy: WorkerChoiceStrategy,
591 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
592 ): void {
593 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
594 this.opts.workerChoiceStrategy = workerChoiceStrategy
595 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
596 this.opts.workerChoiceStrategy
597 )
598 if (workerChoiceStrategyOptions != null) {
599 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
600 }
601 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
602 workerNode.resetUsage()
603 this.sendStatisticsMessageToWorker(workerNodeKey)
604 }
605 }
606
607 /** @inheritDoc */
608 public setWorkerChoiceStrategyOptions (
609 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
610 ): void {
611 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
612 this.opts.workerChoiceStrategyOptions = {
613 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
614 ...workerChoiceStrategyOptions
615 }
616 this.workerChoiceStrategyContext.setOptions(
617 this.opts.workerChoiceStrategyOptions
618 )
619 }
620
621 /** @inheritDoc */
622 public enableTasksQueue (
623 enable: boolean,
624 tasksQueueOptions?: TasksQueueOptions
625 ): void {
626 if (this.opts.enableTasksQueue === true && !enable) {
627 this.flushTasksQueues()
628 }
629 this.opts.enableTasksQueue = enable
630 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
631 }
632
633 /** @inheritDoc */
634 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
635 if (this.opts.enableTasksQueue === true) {
636 this.checkValidTasksQueueOptions(tasksQueueOptions)
637 this.opts.tasksQueueOptions =
638 this.buildTasksQueueOptions(tasksQueueOptions)
639 this.setTasksQueueMaxSize(
640 this.opts.tasksQueueOptions.queueMaxSize as number
641 )
642 } else if (this.opts.tasksQueueOptions != null) {
643 delete this.opts.tasksQueueOptions
644 }
645 }
646
647 private setTasksQueueMaxSize (queueMaxSize: number): void {
648 for (const workerNode of this.workerNodes) {
649 workerNode.tasksQueueBackPressureSize = queueMaxSize
650 }
651 }
652
653 private buildTasksQueueOptions (
654 tasksQueueOptions: TasksQueueOptions
655 ): TasksQueueOptions {
656 return {
657 ...{
658 queueMaxSize: Math.pow(this.maxSize, 2),
659 concurrency: 1
660 },
661 ...tasksQueueOptions
662 }
663 }
664
665 /**
666 * Whether the pool is full or not.
667 *
668 * The pool filling boolean status.
669 */
670 protected get full (): boolean {
671 return this.workerNodes.length >= this.maxSize
672 }
673
674 /**
675 * Whether the pool is busy or not.
676 *
677 * The pool busyness boolean status.
678 */
679 protected abstract get busy (): boolean
680
681 /**
682 * Whether worker nodes are executing concurrently their tasks quota or not.
683 *
684 * @returns Worker nodes busyness boolean status.
685 */
686 protected internalBusy (): boolean {
687 if (this.opts.enableTasksQueue === true) {
688 return (
689 this.workerNodes.findIndex(
690 (workerNode) =>
691 workerNode.info.ready &&
692 workerNode.usage.tasks.executing <
693 (this.opts.tasksQueueOptions?.concurrency as number)
694 ) === -1
695 )
696 } else {
697 return (
698 this.workerNodes.findIndex(
699 (workerNode) =>
700 workerNode.info.ready && workerNode.usage.tasks.executing === 0
701 ) === -1
702 )
703 }
704 }
705
706 /** @inheritDoc */
707 public listTaskFunctions (): string[] {
708 for (const workerNode of this.workerNodes) {
709 if (
710 Array.isArray(workerNode.info.taskFunctions) &&
711 workerNode.info.taskFunctions.length > 0
712 ) {
713 return workerNode.info.taskFunctions
714 }
715 }
716 return []
717 }
718
719 /** @inheritDoc */
720 public async execute (
721 data?: Data,
722 name?: string,
723 transferList?: TransferListItem[]
724 ): Promise<Response> {
725 return await new Promise<Response>((resolve, reject) => {
726 if (name != null && typeof name !== 'string') {
727 reject(new TypeError('name argument must be a string'))
728 }
729 if (
730 name != null &&
731 typeof name === 'string' &&
732 name.trim().length === 0
733 ) {
734 reject(new TypeError('name argument must not be an empty string'))
735 }
736 if (transferList != null && !Array.isArray(transferList)) {
737 reject(new TypeError('transferList argument must be an array'))
738 }
739 const timestamp = performance.now()
740 const workerNodeKey = this.chooseWorkerNode()
741 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
742 if (
743 name != null &&
744 Array.isArray(workerInfo.taskFunctions) &&
745 !workerInfo.taskFunctions.includes(name)
746 ) {
747 reject(
748 new Error(`Task function '${name}' is not registered in the pool`)
749 )
750 }
751 const task: Task<Data> = {
752 name: name ?? DEFAULT_TASK_NAME,
753 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
754 data: data ?? ({} as Data),
755 transferList,
756 timestamp,
757 workerId: workerInfo.id as number,
758 taskId: randomUUID()
759 }
760 this.promiseResponseMap.set(task.taskId as string, {
761 resolve,
762 reject,
763 workerNodeKey
764 })
765 if (
766 this.opts.enableTasksQueue === false ||
767 (this.opts.enableTasksQueue === true &&
768 this.workerNodes[workerNodeKey].usage.tasks.executing <
769 (this.opts.tasksQueueOptions?.concurrency as number))
770 ) {
771 this.executeTask(workerNodeKey, task)
772 } else {
773 this.enqueueTask(workerNodeKey, task)
774 }
775 })
776 }
777
778 /** @inheritDoc */
779 public async destroy (): Promise<void> {
780 await Promise.all(
781 this.workerNodes.map(async (_, workerNodeKey) => {
782 await this.destroyWorkerNode(workerNodeKey)
783 })
784 )
785 this.emitter?.emit(PoolEvents.destroy, this.info)
786 }
787
788 protected async sendKillMessageToWorker (
789 workerNodeKey: number,
790 workerId: number
791 ): Promise<void> {
792 await new Promise<void>((resolve, reject) => {
793 this.registerWorkerMessageListener(workerNodeKey, (message) => {
794 if (message.kill === 'success') {
795 resolve()
796 } else if (message.kill === 'failure') {
797 reject(new Error(`Worker ${workerId} kill message handling failed`))
798 }
799 })
800 this.sendToWorker(workerNodeKey, { kill: true, workerId })
801 })
802 }
803
804 /**
805 * Terminates the worker node given its worker node key.
806 *
807 * @param workerNodeKey - The worker node key.
808 */
809 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
810
811 /**
812 * Setup hook to execute code before worker nodes are created in the abstract constructor.
813 * Can be overridden.
814 *
815 * @virtual
816 */
817 protected setupHook (): void {
818 // Intentionally empty
819 }
820
821 /**
822 * Should return whether the worker is the main worker or not.
823 */
824 protected abstract isMain (): boolean
825
826 /**
827 * Hook executed before the worker task execution.
828 * Can be overridden.
829 *
830 * @param workerNodeKey - The worker node key.
831 * @param task - The task to execute.
832 */
833 protected beforeTaskExecutionHook (
834 workerNodeKey: number,
835 task: Task<Data>
836 ): void {
837 if (this.workerNodes[workerNodeKey]?.usage != null) {
838 const workerUsage = this.workerNodes[workerNodeKey].usage
839 ++workerUsage.tasks.executing
840 this.updateWaitTimeWorkerUsage(workerUsage, task)
841 }
842 if (
843 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
844 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
845 task.name as string
846 ) != null
847 ) {
848 const taskFunctionWorkerUsage = this.workerNodes[
849 workerNodeKey
850 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
851 ++taskFunctionWorkerUsage.tasks.executing
852 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
853 }
854 }
855
856 /**
857 * Hook executed after the worker task execution.
858 * Can be overridden.
859 *
860 * @param workerNodeKey - The worker node key.
861 * @param message - The received message.
862 */
863 protected afterTaskExecutionHook (
864 workerNodeKey: number,
865 message: MessageValue<Response>
866 ): void {
867 if (this.workerNodes[workerNodeKey]?.usage != null) {
868 const workerUsage = this.workerNodes[workerNodeKey].usage
869 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
870 this.updateRunTimeWorkerUsage(workerUsage, message)
871 this.updateEluWorkerUsage(workerUsage, message)
872 }
873 if (
874 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
875 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
876 message.taskPerformance?.name as string
877 ) != null
878 ) {
879 const taskFunctionWorkerUsage = this.workerNodes[
880 workerNodeKey
881 ].getTaskFunctionWorkerUsage(
882 message.taskPerformance?.name as string
883 ) as WorkerUsage
884 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
885 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
886 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
887 }
888 }
889
890 /**
891 * Whether the worker node shall update its task function worker usage or not.
892 *
893 * @param workerNodeKey - The worker node key.
894 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
895 */
896 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
897 const workerInfo = this.getWorkerInfo(workerNodeKey)
898 return (
899 workerInfo != null &&
900 Array.isArray(workerInfo.taskFunctions) &&
901 workerInfo.taskFunctions.length > 2
902 )
903 }
904
905 private updateTaskStatisticsWorkerUsage (
906 workerUsage: WorkerUsage,
907 message: MessageValue<Response>
908 ): void {
909 const workerTaskStatistics = workerUsage.tasks
910 if (
911 workerTaskStatistics.executing != null &&
912 workerTaskStatistics.executing > 0
913 ) {
914 --workerTaskStatistics.executing
915 } else if (
916 workerTaskStatistics.executing != null &&
917 workerTaskStatistics.executing < 0
918 ) {
919 throw new Error(
920 'Worker usage statistic for tasks executing cannot be negative'
921 )
922 }
923 if (message.taskError == null) {
924 ++workerTaskStatistics.executed
925 } else {
926 ++workerTaskStatistics.failed
927 }
928 }
929
930 private updateRunTimeWorkerUsage (
931 workerUsage: WorkerUsage,
932 message: MessageValue<Response>
933 ): void {
934 updateMeasurementStatistics(
935 workerUsage.runTime,
936 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
937 message.taskPerformance?.runTime ?? 0,
938 workerUsage.tasks.executed
939 )
940 }
941
942 private updateWaitTimeWorkerUsage (
943 workerUsage: WorkerUsage,
944 task: Task<Data>
945 ): void {
946 const timestamp = performance.now()
947 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
948 updateMeasurementStatistics(
949 workerUsage.waitTime,
950 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
951 taskWaitTime,
952 workerUsage.tasks.executed
953 )
954 }
955
956 private updateEluWorkerUsage (
957 workerUsage: WorkerUsage,
958 message: MessageValue<Response>
959 ): void {
960 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
961 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
962 updateMeasurementStatistics(
963 workerUsage.elu.active,
964 eluTaskStatisticsRequirements,
965 message.taskPerformance?.elu?.active ?? 0,
966 workerUsage.tasks.executed
967 )
968 updateMeasurementStatistics(
969 workerUsage.elu.idle,
970 eluTaskStatisticsRequirements,
971 message.taskPerformance?.elu?.idle ?? 0,
972 workerUsage.tasks.executed
973 )
974 if (eluTaskStatisticsRequirements.aggregate) {
975 if (message.taskPerformance?.elu != null) {
976 if (workerUsage.elu.utilization != null) {
977 workerUsage.elu.utilization =
978 (workerUsage.elu.utilization +
979 message.taskPerformance.elu.utilization) /
980 2
981 } else {
982 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
983 }
984 }
985 }
986 }
987
988 /**
989 * Chooses a worker node for the next task.
990 *
991 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
992 *
993 * @returns The chosen worker node key
994 */
995 private chooseWorkerNode (): number {
996 if (this.shallCreateDynamicWorker()) {
997 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
998 if (
999 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1000 ) {
1001 return workerNodeKey
1002 }
1003 }
1004 return this.workerChoiceStrategyContext.execute()
1005 }
1006
1007 /**
1008 * Conditions for dynamic worker creation.
1009 *
1010 * @returns Whether to create a dynamic worker or not.
1011 */
1012 private shallCreateDynamicWorker (): boolean {
1013 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1014 }
1015
1016 /**
1017 * Sends a message to worker given its worker node key.
1018 *
1019 * @param workerNodeKey - The worker node key.
1020 * @param message - The message.
1021 * @param transferList - The optional array of transferable objects.
1022 */
1023 protected abstract sendToWorker (
1024 workerNodeKey: number,
1025 message: MessageValue<Data>,
1026 transferList?: TransferListItem[]
1027 ): void
1028
1029 /**
1030 * Creates a new worker.
1031 *
1032 * @returns Newly created worker.
1033 */
1034 protected abstract createWorker (): Worker
1035
1036 /**
1037 * Creates a new, completely set up worker node.
1038 *
1039 * @returns New, completely set up worker node key.
1040 */
1041 protected createAndSetupWorkerNode (): number {
1042 const worker = this.createWorker()
1043
1044 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1045 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1046 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1047 worker.on('error', (error) => {
1048 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1049 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1050 workerInfo.ready = false
1051 this.workerNodes[workerNodeKey].closeChannel()
1052 this.emitter?.emit(PoolEvents.error, error)
1053 if (this.opts.restartWorkerOnError === true && !this.starting) {
1054 if (workerInfo.dynamic) {
1055 this.createAndSetupDynamicWorkerNode()
1056 } else {
1057 this.createAndSetupWorkerNode()
1058 }
1059 }
1060 if (this.opts.enableTasksQueue === true) {
1061 this.redistributeQueuedTasks(workerNodeKey)
1062 }
1063 })
1064 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1065 worker.once('exit', () => {
1066 this.removeWorkerNode(worker)
1067 })
1068
1069 const workerNodeKey = this.addWorkerNode(worker)
1070
1071 this.afterWorkerNodeSetup(workerNodeKey)
1072
1073 return workerNodeKey
1074 }
1075
1076 /**
1077 * Creates a new, completely set up dynamic worker node.
1078 *
1079 * @returns New, completely set up dynamic worker node key.
1080 */
1081 protected createAndSetupDynamicWorkerNode (): number {
1082 const workerNodeKey = this.createAndSetupWorkerNode()
1083 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1084 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1085 message.workerId
1086 )
1087 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1088 // Kill message received from worker
1089 if (
1090 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1091 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1092 ((this.opts.enableTasksQueue === false &&
1093 workerUsage.tasks.executing === 0) ||
1094 (this.opts.enableTasksQueue === true &&
1095 workerUsage.tasks.executing === 0 &&
1096 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1097 ) {
1098 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1099 this.emitter?.emit(PoolEvents.error, error)
1100 })
1101 }
1102 })
1103 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1104 this.sendToWorker(workerNodeKey, {
1105 checkActive: true,
1106 workerId: workerInfo.id as number
1107 })
1108 workerInfo.dynamic = true
1109 if (
1110 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1111 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1112 ) {
1113 workerInfo.ready = true
1114 }
1115 this.checkAndEmitDynamicWorkerCreationEvents()
1116 return workerNodeKey
1117 }
1118
1119 /**
1120 * Registers a listener callback on the worker given its worker node key.
1121 *
1122 * @param workerNodeKey - The worker node key.
1123 * @param listener - The message listener callback.
1124 */
1125 protected abstract registerWorkerMessageListener<
1126 Message extends Data | Response
1127 >(
1128 workerNodeKey: number,
1129 listener: (message: MessageValue<Message>) => void
1130 ): void
1131
1132 /**
1133 * Method hooked up after a worker node has been newly created.
1134 * Can be overridden.
1135 *
1136 * @param workerNodeKey - The newly created worker node key.
1137 */
1138 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1139 // Listen to worker messages.
1140 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1141 // Send the startup message to worker.
1142 this.sendStartupMessageToWorker(workerNodeKey)
1143 // Send the statistics message to worker.
1144 this.sendStatisticsMessageToWorker(workerNodeKey)
1145 if (this.opts.enableTasksQueue === true) {
1146 this.workerNodes[workerNodeKey].onBackPressure =
1147 this.tasksStealingOnBackPressure.bind(this)
1148 }
1149 }
1150
1151 /**
1152 * Sends the startup message to worker given its worker node key.
1153 *
1154 * @param workerNodeKey - The worker node key.
1155 */
1156 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1157
1158 /**
1159 * Sends the statistics message to worker given its worker node key.
1160 *
1161 * @param workerNodeKey - The worker node key.
1162 */
1163 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1164 this.sendToWorker(workerNodeKey, {
1165 statistics: {
1166 runTime:
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1168 .runTime.aggregate,
1169 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1170 .elu.aggregate
1171 },
1172 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1173 })
1174 }
1175
1176 private redistributeQueuedTasks (workerNodeKey: number): void {
1177 while (this.tasksQueueSize(workerNodeKey) > 0) {
1178 let targetWorkerNodeKey: number = workerNodeKey
1179 let minQueuedTasks = Infinity
1180 let executeTask = false
1181 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1182 if (
1183 this.workerNodes[workerNodeId].usage.tasks.executing <
1184 (this.opts.tasksQueueOptions?.concurrency as number)
1185 ) {
1186 executeTask = true
1187 }
1188 if (
1189 workerNodeId !== workerNodeKey &&
1190 workerNode.info.ready &&
1191 workerNode.usage.tasks.queued === 0
1192 ) {
1193 targetWorkerNodeKey = workerNodeId
1194 break
1195 }
1196 if (
1197 workerNodeId !== workerNodeKey &&
1198 workerNode.info.ready &&
1199 workerNode.usage.tasks.queued < minQueuedTasks
1200 ) {
1201 minQueuedTasks = workerNode.usage.tasks.queued
1202 targetWorkerNodeKey = workerNodeId
1203 }
1204 }
1205 if (executeTask) {
1206 this.executeTask(
1207 targetWorkerNodeKey,
1208 this.popTask(workerNodeKey) as Task<Data>
1209 )
1210 } else {
1211 this.enqueueTask(
1212 targetWorkerNodeKey,
1213 this.popTask(workerNodeKey) as Task<Data>
1214 )
1215 }
1216 }
1217 }
1218
1219 private tasksStealingOnBackPressure (workerId: number): void {
1220 const sourceWorkerNode =
1221 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1222 const workerNodes = this.workerNodes
1223 .filter((workerNode) => workerNode.info.id !== workerId)
1224 .sort(
1225 (workerNodeA, workerNodeB) =>
1226 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1227 )
1228 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1229 if (
1230 workerNode.info.ready &&
1231 sourceWorkerNode.usage.tasks.queued > 0 &&
1232 !workerNode.hasBackPressure() &&
1233 workerNode.usage.tasks.executing <
1234 (this.opts.tasksQueueOptions?.concurrency as number)
1235 ) {
1236 this.executeTask(
1237 workerNodeKey,
1238 sourceWorkerNode.popTask() as Task<Data>
1239 )
1240 } else if (
1241 workerNode.info.ready &&
1242 sourceWorkerNode.usage.tasks.queued > 0 &&
1243 !workerNode.hasBackPressure() &&
1244 workerNode.usage.tasks.executing >=
1245 (this.opts.tasksQueueOptions?.concurrency as number)
1246 ) {
1247 this.enqueueTask(
1248 workerNodeKey,
1249 sourceWorkerNode.popTask() as Task<Data>
1250 )
1251 }
1252 }
1253 }
1254
1255 /**
1256 * This method is the listener registered for each worker message.
1257 *
1258 * @returns The listener function to execute when a message is received from a worker.
1259 */
1260 protected workerListener (): (message: MessageValue<Response>) => void {
1261 return (message) => {
1262 this.checkMessageWorkerId(message)
1263 if (message.ready != null && message.taskFunctions != null) {
1264 // Worker ready response received from worker
1265 this.handleWorkerReadyResponse(message)
1266 } else if (message.taskId != null) {
1267 // Task execution response received from worker
1268 this.handleTaskExecutionResponse(message)
1269 } else if (message.taskFunctions != null) {
1270 // Task functions message received from worker
1271 (
1272 this.getWorkerInfo(
1273 this.getWorkerNodeKeyByWorkerId(message.workerId)
1274 ) as WorkerInfo
1275 ).taskFunctions = message.taskFunctions
1276 }
1277 }
1278 }
1279
1280 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1281 if (message.ready === false) {
1282 throw new Error(`Worker ${message.workerId} failed to initialize`)
1283 }
1284 const workerInfo = this.getWorkerInfo(
1285 this.getWorkerNodeKeyByWorkerId(message.workerId)
1286 ) as WorkerInfo
1287 workerInfo.ready = message.ready as boolean
1288 workerInfo.taskFunctions = message.taskFunctions
1289 if (this.emitter != null && this.ready) {
1290 this.emitter.emit(PoolEvents.ready, this.info)
1291 }
1292 }
1293
1294 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1295 const { taskId, taskError, data } = message
1296 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1297 if (promiseResponse != null) {
1298 if (taskError != null) {
1299 this.emitter?.emit(PoolEvents.taskError, taskError)
1300 promiseResponse.reject(taskError.message)
1301 } else {
1302 promiseResponse.resolve(data as Response)
1303 }
1304 const workerNodeKey = promiseResponse.workerNodeKey
1305 this.afterTaskExecutionHook(workerNodeKey, message)
1306 this.promiseResponseMap.delete(taskId as string)
1307 if (
1308 this.opts.enableTasksQueue === true &&
1309 this.tasksQueueSize(workerNodeKey) > 0 &&
1310 this.workerNodes[workerNodeKey].usage.tasks.executing <
1311 (this.opts.tasksQueueOptions?.concurrency as number)
1312 ) {
1313 this.executeTask(
1314 workerNodeKey,
1315 this.dequeueTask(workerNodeKey) as Task<Data>
1316 )
1317 }
1318 this.workerChoiceStrategyContext.update(workerNodeKey)
1319 }
1320 }
1321
1322 private checkAndEmitTaskExecutionEvents (): void {
1323 if (this.busy) {
1324 this.emitter?.emit(PoolEvents.busy, this.info)
1325 }
1326 }
1327
1328 private checkAndEmitTaskQueuingEvents (): void {
1329 if (this.hasBackPressure()) {
1330 this.emitter?.emit(PoolEvents.backPressure, this.info)
1331 }
1332 }
1333
1334 private checkAndEmitDynamicWorkerCreationEvents (): void {
1335 if (this.type === PoolTypes.dynamic) {
1336 if (this.full) {
1337 this.emitter?.emit(PoolEvents.full, this.info)
1338 }
1339 }
1340 }
1341
1342 /**
1343 * Gets the worker information given its worker node key.
1344 *
1345 * @param workerNodeKey - The worker node key.
1346 * @returns The worker information.
1347 */
1348 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1349 return this.workerNodes[workerNodeKey]?.info
1350 }
1351
1352 /**
1353 * Adds the given worker in the pool worker nodes.
1354 *
1355 * @param worker - The worker.
1356 * @returns The added worker node key.
1357 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1358 */
1359 private addWorkerNode (worker: Worker): number {
1360 const workerNode = new WorkerNode<Worker, Data>(
1361 worker,
1362 this.worker,
1363 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
1364 )
1365 // Flag the worker node as ready at pool startup.
1366 if (this.starting) {
1367 workerNode.info.ready = true
1368 }
1369 this.workerNodes.push(workerNode)
1370 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1371 if (workerNodeKey === -1) {
1372 throw new Error('Worker node added not found')
1373 }
1374 return workerNodeKey
1375 }
1376
1377 /**
1378 * Removes the given worker from the pool worker nodes.
1379 *
1380 * @param worker - The worker.
1381 */
1382 private removeWorkerNode (worker: Worker): void {
1383 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1384 if (workerNodeKey !== -1) {
1385 this.workerNodes.splice(workerNodeKey, 1)
1386 this.workerChoiceStrategyContext.remove(workerNodeKey)
1387 }
1388 }
1389
1390 /** @inheritDoc */
1391 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1392 return (
1393 this.opts.enableTasksQueue === true &&
1394 this.workerNodes[workerNodeKey].hasBackPressure()
1395 )
1396 }
1397
1398 private hasBackPressure (): boolean {
1399 return (
1400 this.opts.enableTasksQueue === true &&
1401 this.workerNodes.findIndex(
1402 (workerNode) => !workerNode.hasBackPressure()
1403 ) === -1
1404 )
1405 }
1406
1407 /**
1408 * Executes the given task on the worker given its worker node key.
1409 *
1410 * @param workerNodeKey - The worker node key.
1411 * @param task - The task to execute.
1412 */
1413 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1414 this.beforeTaskExecutionHook(workerNodeKey, task)
1415 this.sendToWorker(workerNodeKey, task, task.transferList)
1416 this.checkAndEmitTaskExecutionEvents()
1417 }
1418
1419 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1420 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1421 this.checkAndEmitTaskQueuingEvents()
1422 return tasksQueueSize
1423 }
1424
1425 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1426 return this.workerNodes[workerNodeKey].dequeueTask()
1427 }
1428
1429 private popTask (workerNodeKey: number): Task<Data> | undefined {
1430 return this.workerNodes[workerNodeKey].popTask()
1431 }
1432
1433 private tasksQueueSize (workerNodeKey: number): number {
1434 return this.workerNodes[workerNodeKey].tasksQueueSize()
1435 }
1436
1437 protected flushTasksQueue (workerNodeKey: number): void {
1438 while (this.tasksQueueSize(workerNodeKey) > 0) {
1439 this.executeTask(
1440 workerNodeKey,
1441 this.dequeueTask(workerNodeKey) as Task<Data>
1442 )
1443 }
1444 this.workerNodes[workerNodeKey].clearTasksQueue()
1445 }
1446
1447 private flushTasksQueues (): void {
1448 for (const [workerNodeKey] of this.workerNodes.entries()) {
1449 this.flushTasksQueue(workerNodeKey)
1450 }
1451 }
1452 }