1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { existsSync
} from
'node:fs'
4 import { type TransferListItem
} from
'node:worker_threads'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
21 updateMeasurementStatistics
23 import { KillBehaviors
} from
'../worker/worker-options'
24 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
43 type MeasurementStatisticsRequirements
,
45 WorkerChoiceStrategies
,
46 type WorkerChoiceStrategy
,
47 type WorkerChoiceStrategyOptions
48 } from
'./selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
50 import { version
} from
'./version'
51 import { WorkerNode
} from
'./worker-node'
54 * Base class that implements some shared logic for all poolifier pools.
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
60 export abstract class AbstractPool
<
61 Worker
extends IWorker
,
64 > implements IPool
<Worker
, Data
, Response
> {
66 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
69 public readonly emitter
?: PoolEmitter
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
79 new Map
<string, PromiseResponseWrapper
<Response
>>()
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
91 * Dynamic pool maximum size property placeholder.
93 protected readonly max
?: number
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
100 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
103 * Whether the pool is started or not.
105 private started
: boolean
107 * Whether the pool is starting or not.
109 private starting
: boolean
111 * The start timestamp of the pool.
113 private readonly startTimestamp
116 * Constructs a new poolifier pool.
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
123 protected readonly numberOfWorkers
: number,
124 protected readonly filePath
: string,
125 protected readonly opts
: PoolOptions
<Worker
>
127 if (!this.isMain()) {
129 'Cannot start a pool from a worker with the same type as the pool'
132 this.checkNumberOfWorkers(this.numberOfWorkers
)
133 this.checkFilePath(this.filePath
)
134 this.checkPoolOptions(this.opts
)
136 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
137 this.executeTask
= this.executeTask
.bind(this)
138 this.enqueueTask
= this.enqueueTask
.bind(this)
140 if (this.opts
.enableEvents
=== true) {
141 this.emitter
= new PoolEmitter()
143 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
149 this.opts
.workerChoiceStrategy
,
150 this.opts
.workerChoiceStrategyOptions
155 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
158 this.starting
= false
159 if (this.opts
.startWorkers
=== true) {
163 this.startTimestamp
= performance
.now()
166 private checkFilePath (filePath
: string): void {
169 typeof filePath
!== 'string' ||
170 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
172 throw new Error('Please specify a file with a worker implementation')
174 if (!existsSync(filePath
)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
179 private checkNumberOfWorkers (numberOfWorkers
: number): void {
180 if (numberOfWorkers
== null) {
182 'Cannot instantiate a pool without specifying the number of workers'
184 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
186 'Cannot instantiate a pool with a non safe integer number of workers'
188 } else if (numberOfWorkers
< 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
192 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
197 protected checkDynamicPoolSize (min
: number, max
: number): void {
198 if (this.type === PoolTypes
.dynamic
) {
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
203 } else if (!Number.isSafeInteger(max
)) {
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
207 } else if (min
> max
) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
211 } else if (max
=== 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
215 } else if (min
=== max
) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
223 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
224 if (isPlainObject(opts
)) {
225 this.opts
.startWorkers
= opts
.startWorkers
?? true
226 this.opts
.workerChoiceStrategy
=
227 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
228 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
229 this.opts
.workerChoiceStrategyOptions
= {
230 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
231 ...opts
.workerChoiceStrategyOptions
233 this.checkValidWorkerChoiceStrategyOptions(
234 this.opts
.workerChoiceStrategyOptions
236 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
237 this.opts
.enableEvents
= opts
.enableEvents
?? true
238 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
239 if (this.opts
.enableTasksQueue
) {
240 this.checkValidTasksQueueOptions(
241 opts
.tasksQueueOptions
as TasksQueueOptions
243 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
244 opts
.tasksQueueOptions
as TasksQueueOptions
248 throw new TypeError('Invalid pool options: must be a plain object')
252 private checkValidWorkerChoiceStrategy (
253 workerChoiceStrategy
: WorkerChoiceStrategy
255 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
257 `Invalid worker choice strategy '${workerChoiceStrategy}'`
262 private checkValidWorkerChoiceStrategyOptions (
263 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
265 if (!isPlainObject(workerChoiceStrategyOptions
)) {
267 'Invalid worker choice strategy options: must be a plain object'
271 workerChoiceStrategyOptions
.retries
!= null &&
272 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
275 'Invalid worker choice strategy options: retries must be an integer'
279 workerChoiceStrategyOptions
.retries
!= null &&
280 workerChoiceStrategyOptions
.retries
< 0
282 throw new RangeError(
283 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
287 workerChoiceStrategyOptions
.weights
!= null &&
288 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
291 'Invalid worker choice strategy options: must have a weight for each worker node'
295 workerChoiceStrategyOptions
.measurement
!= null &&
296 !Object.values(Measurements
).includes(
297 workerChoiceStrategyOptions
.measurement
301 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
306 private checkValidTasksQueueOptions (
307 tasksQueueOptions
: TasksQueueOptions
309 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
310 throw new TypeError('Invalid tasks queue options: must be a plain object')
313 tasksQueueOptions
?.concurrency
!= null &&
314 !Number.isSafeInteger(tasksQueueOptions
?.concurrency
)
317 'Invalid worker node tasks concurrency: must be an integer'
321 tasksQueueOptions
?.concurrency
!= null &&
322 tasksQueueOptions
?.concurrency
<= 0
324 throw new RangeError(
325 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
329 tasksQueueOptions
?.size
!= null &&
330 !Number.isSafeInteger(tasksQueueOptions
?.size
)
333 'Invalid worker node tasks queue size: must be an integer'
336 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
?.size
<= 0) {
337 throw new RangeError(
338 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
344 public get
info (): PoolInfo
{
349 started
: this.started
,
351 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
352 minSize
: this.minSize
,
353 maxSize
: this.maxSize
,
354 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
355 .runTime
.aggregate
&&
356 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
357 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
358 workerNodes
: this.workerNodes
.length
,
359 idleWorkerNodes
: this.workerNodes
.reduce(
360 (accumulator
, workerNode
) =>
361 workerNode
.usage
.tasks
.executing
=== 0
366 busyWorkerNodes
: this.workerNodes
.reduce(
367 (accumulator
, workerNode
) =>
368 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
371 executedTasks
: this.workerNodes
.reduce(
372 (accumulator
, workerNode
) =>
373 accumulator
+ workerNode
.usage
.tasks
.executed
,
376 executingTasks
: this.workerNodes
.reduce(
377 (accumulator
, workerNode
) =>
378 accumulator
+ workerNode
.usage
.tasks
.executing
,
381 ...(this.opts
.enableTasksQueue
=== true && {
382 queuedTasks
: this.workerNodes
.reduce(
383 (accumulator
, workerNode
) =>
384 accumulator
+ workerNode
.usage
.tasks
.queued
,
388 ...(this.opts
.enableTasksQueue
=== true && {
389 maxQueuedTasks
: this.workerNodes
.reduce(
390 (accumulator
, workerNode
) =>
391 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
395 ...(this.opts
.enableTasksQueue
=== true && {
396 backPressure
: this.hasBackPressure()
398 ...(this.opts
.enableTasksQueue
=== true && {
399 stolenTasks
: this.workerNodes
.reduce(
400 (accumulator
, workerNode
) =>
401 accumulator
+ workerNode
.usage
.tasks
.stolen
,
405 failedTasks
: this.workerNodes
.reduce(
406 (accumulator
, workerNode
) =>
407 accumulator
+ workerNode
.usage
.tasks
.failed
,
410 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
411 .runTime
.aggregate
&& {
415 ...this.workerNodes
.map(
416 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
422 ...this.workerNodes
.map(
423 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
427 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
428 .runTime
.average
&& {
431 this.workerNodes
.reduce
<number[]>(
432 (accumulator
, workerNode
) =>
433 accumulator
.concat(workerNode
.usage
.runTime
.history
),
439 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
443 this.workerNodes
.reduce
<number[]>(
444 (accumulator
, workerNode
) =>
445 accumulator
.concat(workerNode
.usage
.runTime
.history
),
453 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
454 .waitTime
.aggregate
&& {
458 ...this.workerNodes
.map(
459 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
465 ...this.workerNodes
.map(
466 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
470 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
471 .waitTime
.average
&& {
474 this.workerNodes
.reduce
<number[]>(
475 (accumulator
, workerNode
) =>
476 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
482 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
483 .waitTime
.median
&& {
486 this.workerNodes
.reduce
<number[]>(
487 (accumulator
, workerNode
) =>
488 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
500 * The pool readiness boolean status.
502 private get
ready (): boolean {
504 this.workerNodes
.reduce(
505 (accumulator
, workerNode
) =>
506 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
515 * The approximate pool utilization.
517 * @returns The pool utilization.
519 private get
utilization (): number {
520 const poolTimeCapacity
=
521 (performance
.now() - this.startTimestamp
) * this.maxSize
522 const totalTasksRunTime
= this.workerNodes
.reduce(
523 (accumulator
, workerNode
) =>
524 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
527 const totalTasksWaitTime
= this.workerNodes
.reduce(
528 (accumulator
, workerNode
) =>
529 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
532 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
538 * If it is `'dynamic'`, it provides the `max` property.
540 protected abstract get
type (): PoolType
545 protected abstract get
worker (): WorkerType
548 * The pool minimum size.
550 protected get
minSize (): number {
551 return this.numberOfWorkers
555 * The pool maximum size.
557 protected get
maxSize (): number {
558 return this.max
?? this.numberOfWorkers
562 * Checks if the worker id sent in the received message from a worker is valid.
564 * @param message - The received message.
565 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
567 private checkMessageWorkerId (message
: MessageValue
<Response
>): void {
568 if (message
.workerId
== null) {
569 throw new Error('Worker message received without worker id')
571 message
.workerId
!= null &&
572 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
575 `Worker message received from unknown worker '${message.workerId}'`
581 * Gets the given worker its worker node key.
583 * @param worker - The worker.
584 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
586 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
587 return this.workerNodes
.findIndex(
588 workerNode
=> workerNode
.worker
=== worker
593 * Gets the worker node key given its worker id.
595 * @param workerId - The worker id.
596 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
598 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
599 return this.workerNodes
.findIndex(
600 workerNode
=> workerNode
.info
.id
=== workerId
605 public setWorkerChoiceStrategy (
606 workerChoiceStrategy
: WorkerChoiceStrategy
,
607 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
609 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
610 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
611 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
612 this.opts
.workerChoiceStrategy
614 if (workerChoiceStrategyOptions
!= null) {
615 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
617 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
618 workerNode
.resetUsage()
619 this.sendStatisticsMessageToWorker(workerNodeKey
)
624 public setWorkerChoiceStrategyOptions (
625 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
627 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
628 this.opts
.workerChoiceStrategyOptions
= {
629 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
630 ...workerChoiceStrategyOptions
632 this.workerChoiceStrategyContext
.setOptions(
633 this.opts
.workerChoiceStrategyOptions
638 public enableTasksQueue (
640 tasksQueueOptions
?: TasksQueueOptions
642 if (this.opts
.enableTasksQueue
=== true && !enable
) {
643 this.unsetTaskStealing()
644 this.unsetTasksStealingOnBackPressure()
645 this.flushTasksQueues()
647 this.opts
.enableTasksQueue
= enable
648 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
652 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
653 if (this.opts
.enableTasksQueue
=== true) {
654 this.checkValidTasksQueueOptions(tasksQueueOptions
)
655 this.opts
.tasksQueueOptions
=
656 this.buildTasksQueueOptions(tasksQueueOptions
)
657 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
658 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
659 this.setTaskStealing()
661 this.unsetTaskStealing()
663 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
664 this.setTasksStealingOnBackPressure()
666 this.unsetTasksStealingOnBackPressure()
668 } else if (this.opts
.tasksQueueOptions
!= null) {
669 delete this.opts
.tasksQueueOptions
673 private setTasksQueueSize (size
: number): void {
674 for (const workerNode
of this.workerNodes
) {
675 workerNode
.tasksQueueBackPressureSize
= size
679 private setTaskStealing (): void {
680 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
681 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
682 this.taskStealingOnEmptyQueue
.bind(this)
686 private unsetTaskStealing (): void {
687 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
688 delete this.workerNodes
[workerNodeKey
].onEmptyQueue
692 private setTasksStealingOnBackPressure (): void {
693 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
694 this.workerNodes
[workerNodeKey
].onBackPressure
=
695 this.tasksStealingOnBackPressure
.bind(this)
699 private unsetTasksStealingOnBackPressure (): void {
700 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
701 delete this.workerNodes
[workerNodeKey
].onBackPressure
705 private buildTasksQueueOptions (
706 tasksQueueOptions
: TasksQueueOptions
707 ): TasksQueueOptions
{
710 size
: Math.pow(this.maxSize
, 2),
713 tasksStealingOnBackPressure
: true
720 * Whether the pool is full or not.
722 * The pool filling boolean status.
724 protected get
full (): boolean {
725 return this.workerNodes
.length
>= this.maxSize
729 * Whether the pool is busy or not.
731 * The pool busyness boolean status.
733 protected abstract get
busy (): boolean
736 * Whether worker nodes are executing concurrently their tasks quota or not.
738 * @returns Worker nodes busyness boolean status.
740 protected internalBusy (): boolean {
741 if (this.opts
.enableTasksQueue
=== true) {
743 this.workerNodes
.findIndex(
745 workerNode
.info
.ready
&&
746 workerNode
.usage
.tasks
.executing
<
747 (this.opts
.tasksQueueOptions
?.concurrency
as number)
752 this.workerNodes
.findIndex(
754 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
759 private async sendTaskFunctionOperationToWorker (
760 workerNodeKey
: number,
761 message
: MessageValue
<Data
>
762 ): Promise
<boolean> {
763 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
764 return await new Promise
<boolean>((resolve
, reject
) => {
765 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
767 message
.workerId
=== workerId
&&
768 message
.taskFunctionOperationStatus
=== true
772 message
.workerId
=== workerId
&&
773 message
.taskFunctionOperationStatus
=== false
777 `Task function operation ${
778 message.taskFunctionOperation as string
779 } failed on worker ${message.workerId}`
784 this.sendToWorker(workerNodeKey
, message
)
788 private async sendTaskFunctionOperationToWorkers (
789 message
: Omit
<MessageValue
<Data
>, 'workerId'>
790 ): Promise
<boolean> {
791 return await new Promise
<boolean>((resolve
, reject
) => {
792 const responsesReceived
= new Array<MessageValue
<Data
| Response
>>()
793 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
794 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
795 if (message
.taskFunctionOperationStatus
!= null) {
796 responsesReceived
.push(message
)
798 responsesReceived
.length
=== this.workerNodes
.length
&&
799 responsesReceived
.every(
800 message
=> message
.taskFunctionOperationStatus
=== true
805 responsesReceived
.length
=== this.workerNodes
.length
&&
806 responsesReceived
.some(
807 message
=> message
.taskFunctionOperationStatus
=== false
812 `Task function operation ${
813 message.taskFunctionOperation as string
814 } failed on worker ${message.workerId as number}`
820 this.sendToWorker(workerNodeKey
, message
)
826 public hasTaskFunction (name
: string): boolean {
827 for (const workerNode
of this.workerNodes
) {
829 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
830 workerNode
.info
.taskFunctionNames
.includes(name
)
839 public async addTaskFunction (
841 taskFunction
: TaskFunction
<Data
, Response
>
842 ): Promise
<boolean> {
843 this.taskFunctions
.set(name
, taskFunction
)
844 return await this.sendTaskFunctionOperationToWorkers({
845 taskFunctionOperation
: 'add',
846 taskFunctionName
: name
,
847 taskFunction
: taskFunction
.toString()
852 public async removeTaskFunction (name
: string): Promise
<boolean> {
853 this.taskFunctions
.delete(name
)
854 return await this.sendTaskFunctionOperationToWorkers({
855 taskFunctionOperation
: 'remove',
856 taskFunctionName
: name
861 public listTaskFunctionNames (): string[] {
862 for (const workerNode
of this.workerNodes
) {
864 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
865 workerNode
.info
.taskFunctionNames
.length
> 0
867 return workerNode
.info
.taskFunctionNames
874 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
875 return await this.sendTaskFunctionOperationToWorkers({
876 taskFunctionOperation
: 'default',
877 taskFunctionName
: name
881 private shallExecuteTask (workerNodeKey
: number): boolean {
883 this.tasksQueueSize(workerNodeKey
) === 0 &&
884 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
885 (this.opts
.tasksQueueOptions
?.concurrency
as number)
890 public async execute (
893 transferList
?: TransferListItem
[]
894 ): Promise
<Response
> {
895 return await new Promise
<Response
>((resolve
, reject
) => {
897 reject(new Error('Cannot execute a task on not started pool'))
900 if (name
!= null && typeof name
!== 'string') {
901 reject(new TypeError('name argument must be a string'))
906 typeof name
=== 'string' &&
907 name
.trim().length
=== 0
909 reject(new TypeError('name argument must not be an empty string'))
912 if (transferList
!= null && !Array.isArray(transferList
)) {
913 reject(new TypeError('transferList argument must be an array'))
916 const timestamp
= performance
.now()
917 const workerNodeKey
= this.chooseWorkerNode()
918 const task
: Task
<Data
> = {
919 name
: name
?? DEFAULT_TASK_NAME
,
920 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
921 data
: data
?? ({} as Data
),
926 this.promiseResponseMap
.set(task
.taskId
as string, {
932 this.opts
.enableTasksQueue
=== false ||
933 (this.opts
.enableTasksQueue
=== true &&
934 this.shallExecuteTask(workerNodeKey
))
936 this.executeTask(workerNodeKey
, task
)
938 this.enqueueTask(workerNodeKey
, task
)
944 public start (): void {
947 this.workerNodes
.reduce(
948 (accumulator
, workerNode
) =>
949 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
951 ) < this.numberOfWorkers
953 this.createAndSetupWorkerNode()
955 this.starting
= false
960 public async destroy (): Promise
<void> {
962 this.workerNodes
.map(async (_
, workerNodeKey
) => {
963 await this.destroyWorkerNode(workerNodeKey
)
966 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
970 protected async sendKillMessageToWorker (
971 workerNodeKey
: number
973 await new Promise
<void>((resolve
, reject
) => {
974 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
975 if (message
.kill
=== 'success') {
977 } else if (message
.kill
=== 'failure') {
981 message.workerId as number
982 } kill message handling failed`
987 this.sendToWorker(workerNodeKey
, { kill
: true })
992 * Terminates the worker node given its worker node key.
994 * @param workerNodeKey - The worker node key.
996 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
999 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1000 * Can be overridden.
1004 protected setupHook (): void {
1005 /* Intentionally empty */
1009 * Should return whether the worker is the main worker or not.
1011 protected abstract isMain (): boolean
1014 * Hook executed before the worker task execution.
1015 * Can be overridden.
1017 * @param workerNodeKey - The worker node key.
1018 * @param task - The task to execute.
1020 protected beforeTaskExecutionHook (
1021 workerNodeKey
: number,
1024 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1025 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1026 ++workerUsage
.tasks
.executing
1027 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1030 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1031 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1035 const taskFunctionWorkerUsage
= this.workerNodes
[
1037 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1038 ++taskFunctionWorkerUsage
.tasks
.executing
1039 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1044 * Hook executed after the worker task execution.
1045 * Can be overridden.
1047 * @param workerNodeKey - The worker node key.
1048 * @param message - The received message.
1050 protected afterTaskExecutionHook (
1051 workerNodeKey
: number,
1052 message
: MessageValue
<Response
>
1054 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1055 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1056 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1057 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1058 this.updateEluWorkerUsage(workerUsage
, message
)
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1062 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1063 message
.taskPerformance
?.name
as string
1066 const taskFunctionWorkerUsage
= this.workerNodes
[
1068 ].getTaskFunctionWorkerUsage(
1069 message
.taskPerformance
?.name
as string
1071 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1072 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1073 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1078 * Whether the worker node shall update its task function worker usage or not.
1080 * @param workerNodeKey - The worker node key.
1081 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1083 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1084 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1086 workerInfo
!= null &&
1087 Array.isArray(workerInfo
.taskFunctionNames
) &&
1088 workerInfo
.taskFunctionNames
.length
> 2
1092 private updateTaskStatisticsWorkerUsage (
1093 workerUsage
: WorkerUsage
,
1094 message
: MessageValue
<Response
>
1096 const workerTaskStatistics
= workerUsage
.tasks
1098 workerTaskStatistics
.executing
!= null &&
1099 workerTaskStatistics
.executing
> 0
1101 --workerTaskStatistics
.executing
1103 if (message
.workerError
== null) {
1104 ++workerTaskStatistics
.executed
1106 ++workerTaskStatistics
.failed
1110 private updateRunTimeWorkerUsage (
1111 workerUsage
: WorkerUsage
,
1112 message
: MessageValue
<Response
>
1114 if (message
.workerError
!= null) {
1117 updateMeasurementStatistics(
1118 workerUsage
.runTime
,
1119 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1120 message
.taskPerformance
?.runTime
?? 0
1124 private updateWaitTimeWorkerUsage (
1125 workerUsage
: WorkerUsage
,
1128 const timestamp
= performance
.now()
1129 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1130 updateMeasurementStatistics(
1131 workerUsage
.waitTime
,
1132 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1137 private updateEluWorkerUsage (
1138 workerUsage
: WorkerUsage
,
1139 message
: MessageValue
<Response
>
1141 if (message
.workerError
!= null) {
1144 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1145 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1146 updateMeasurementStatistics(
1147 workerUsage
.elu
.active
,
1148 eluTaskStatisticsRequirements
,
1149 message
.taskPerformance
?.elu
?.active
?? 0
1151 updateMeasurementStatistics(
1152 workerUsage
.elu
.idle
,
1153 eluTaskStatisticsRequirements
,
1154 message
.taskPerformance
?.elu
?.idle
?? 0
1156 if (eluTaskStatisticsRequirements
.aggregate
) {
1157 if (message
.taskPerformance
?.elu
!= null) {
1158 if (workerUsage
.elu
.utilization
!= null) {
1159 workerUsage
.elu
.utilization
=
1160 (workerUsage
.elu
.utilization
+
1161 message
.taskPerformance
.elu
.utilization
) /
1164 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1171 * Chooses a worker node for the next task.
1173 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1175 * @returns The chosen worker node key
1177 private chooseWorkerNode (): number {
1178 if (this.shallCreateDynamicWorker()) {
1179 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1181 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1183 return workerNodeKey
1186 return this.workerChoiceStrategyContext
.execute()
1190 * Conditions for dynamic worker creation.
1192 * @returns Whether to create a dynamic worker or not.
1194 private shallCreateDynamicWorker (): boolean {
1195 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1199 * Sends a message to worker given its worker node key.
1201 * @param workerNodeKey - The worker node key.
1202 * @param message - The message.
1203 * @param transferList - The optional array of transferable objects.
1205 protected abstract sendToWorker (
1206 workerNodeKey
: number,
1207 message
: MessageValue
<Data
>,
1208 transferList
?: TransferListItem
[]
1212 * Creates a new worker.
1214 * @returns Newly created worker.
1216 protected abstract createWorker (): Worker
1219 * Creates a new, completely set up worker node.
1221 * @returns New, completely set up worker node key.
1223 protected createAndSetupWorkerNode (): number {
1224 const worker
= this.createWorker()
1226 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1227 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1228 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1229 worker
.on('error', error
=> {
1230 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1231 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1232 workerInfo
.ready
= false
1233 this.workerNodes
[workerNodeKey
].closeChannel()
1234 this.emitter
?.emit(PoolEvents
.error
, error
)
1236 this.opts
.restartWorkerOnError
=== true &&
1240 if (workerInfo
.dynamic
) {
1241 this.createAndSetupDynamicWorkerNode()
1243 this.createAndSetupWorkerNode()
1246 if (this.opts
.enableTasksQueue
=== true) {
1247 this.redistributeQueuedTasks(workerNodeKey
)
1250 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1251 worker
.once('exit', () => {
1252 this.removeWorkerNode(worker
)
1255 const workerNodeKey
= this.addWorkerNode(worker
)
1257 this.afterWorkerNodeSetup(workerNodeKey
)
1259 return workerNodeKey
1263 * Creates a new, completely set up dynamic worker node.
1265 * @returns New, completely set up dynamic worker node key.
1267 protected createAndSetupDynamicWorkerNode (): number {
1268 const workerNodeKey
= this.createAndSetupWorkerNode()
1269 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1270 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1273 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1274 // Kill message received from worker
1276 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1277 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1278 ((this.opts
.enableTasksQueue
=== false &&
1279 workerUsage
.tasks
.executing
=== 0) ||
1280 (this.opts
.enableTasksQueue
=== true &&
1281 workerUsage
.tasks
.executing
=== 0 &&
1282 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1284 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1285 this.emitter
?.emit(PoolEvents
.error
, error
)
1289 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1290 this.sendToWorker(workerNodeKey
, {
1293 if (this.taskFunctions
.size
> 0) {
1294 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1295 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1296 taskFunctionOperation
: 'add',
1298 taskFunction
: taskFunction
.toString()
1300 this.emitter
?.emit(PoolEvents
.error
, error
)
1304 workerInfo
.dynamic
= true
1306 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1307 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1309 workerInfo
.ready
= true
1311 this.checkAndEmitDynamicWorkerCreationEvents()
1312 return workerNodeKey
1316 * Registers a listener callback on the worker given its worker node key.
1318 * @param workerNodeKey - The worker node key.
1319 * @param listener - The message listener callback.
1321 protected abstract registerWorkerMessageListener
<
1322 Message
extends Data
| Response
1324 workerNodeKey
: number,
1325 listener
: (message
: MessageValue
<Message
>) => void
1329 * Method hooked up after a worker node has been newly created.
1330 * Can be overridden.
1332 * @param workerNodeKey - The newly created worker node key.
1334 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1335 // Listen to worker messages.
1336 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1337 // Send the startup message to worker.
1338 this.sendStartupMessageToWorker(workerNodeKey
)
1339 // Send the statistics message to worker.
1340 this.sendStatisticsMessageToWorker(workerNodeKey
)
1341 if (this.opts
.enableTasksQueue
=== true) {
1342 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1343 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1344 this.taskStealingOnEmptyQueue
.bind(this)
1346 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1347 this.workerNodes
[workerNodeKey
].onBackPressure
=
1348 this.tasksStealingOnBackPressure
.bind(this)
1354 * Sends the startup message to worker given its worker node key.
1356 * @param workerNodeKey - The worker node key.
1358 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1361 * Sends the statistics message to worker given its worker node key.
1363 * @param workerNodeKey - The worker node key.
1365 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1366 this.sendToWorker(workerNodeKey
, {
1369 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1371 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1377 private redistributeQueuedTasks (workerNodeKey
: number): void {
1378 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1379 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1380 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1381 return workerNode
.info
.ready
&&
1382 workerNode
.usage
.tasks
.queued
<
1383 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1389 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1390 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1391 this.executeTask(destinationWorkerNodeKey
, task
)
1393 this.enqueueTask(destinationWorkerNodeKey
, task
)
1398 private updateTaskStolenStatisticsWorkerUsage (
1399 workerNodeKey
: number,
1402 const workerNode
= this.workerNodes
[workerNodeKey
]
1403 if (workerNode
?.usage
!= null) {
1404 ++workerNode
.usage
.tasks
.stolen
1407 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1408 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1410 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1413 ++taskFunctionWorkerUsage
.tasks
.stolen
1417 private taskStealingOnEmptyQueue (workerId
: number): void {
1418 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1419 const workerNodes
= this.workerNodes
1422 (workerNodeA
, workerNodeB
) =>
1423 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1425 const sourceWorkerNode
= workerNodes
.find(
1427 workerNode
.info
.ready
&&
1428 workerNode
.info
.id
!== workerId
&&
1429 workerNode
.usage
.tasks
.queued
> 0
1431 if (sourceWorkerNode
!= null) {
1432 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1433 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1434 this.executeTask(destinationWorkerNodeKey
, task
)
1436 this.enqueueTask(destinationWorkerNodeKey
, task
)
1438 this.updateTaskStolenStatisticsWorkerUsage(
1439 destinationWorkerNodeKey
,
1445 private tasksStealingOnBackPressure (workerId
: number): void {
1446 const sizeOffset
= 1
1447 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1450 const sourceWorkerNode
=
1451 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1452 const workerNodes
= this.workerNodes
1455 (workerNodeA
, workerNodeB
) =>
1456 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1458 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1460 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1461 workerNode
.info
.ready
&&
1462 workerNode
.info
.id
!== workerId
&&
1463 workerNode
.usage
.tasks
.queued
<
1464 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1466 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1467 if (this.shallExecuteTask(workerNodeKey
)) {
1468 this.executeTask(workerNodeKey
, task
)
1470 this.enqueueTask(workerNodeKey
, task
)
1472 this.updateTaskStolenStatisticsWorkerUsage(
1481 * This method is the listener registered for each worker message.
1483 * @returns The listener function to execute when a message is received from a worker.
1485 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1487 this.checkMessageWorkerId(message
)
1488 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1489 // Worker ready response received from worker
1490 this.handleWorkerReadyResponse(message
)
1491 } else if (message
.taskId
!= null) {
1492 // Task execution response received from worker
1493 this.handleTaskExecutionResponse(message
)
1494 } else if (message
.taskFunctionNames
!= null) {
1495 // Task function names message received from worker
1497 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1498 ).taskFunctionNames
= message
.taskFunctionNames
1503 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1504 if (message
.ready
=== false) {
1506 `Worker ${message.workerId as number} failed to initialize`
1509 const workerInfo
= this.getWorkerInfo(
1510 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1512 workerInfo
.ready
= message
.ready
as boolean
1513 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1514 if (this.emitter
!= null && this.ready
) {
1515 this.emitter
.emit(PoolEvents
.ready
, this.info
)
1519 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1520 const { taskId
, workerError
, data
} = message
1521 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1522 if (promiseResponse
!= null) {
1523 if (workerError
!= null) {
1524 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1525 promiseResponse
.reject(workerError
.message
)
1527 promiseResponse
.resolve(data
as Response
)
1529 const workerNodeKey
= promiseResponse
.workerNodeKey
1530 this.afterTaskExecutionHook(workerNodeKey
, message
)
1531 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1532 this.promiseResponseMap
.delete(taskId
as string)
1534 this.opts
.enableTasksQueue
=== true &&
1535 this.tasksQueueSize(workerNodeKey
) > 0 &&
1536 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1537 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1541 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1547 private checkAndEmitTaskExecutionEvents (): void {
1549 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1553 private checkAndEmitTaskQueuingEvents (): void {
1554 if (this.hasBackPressure()) {
1555 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1559 private checkAndEmitDynamicWorkerCreationEvents (): void {
1560 if (this.type === PoolTypes
.dynamic
) {
1562 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1568 * Gets the worker information given its worker node key.
1570 * @param workerNodeKey - The worker node key.
1571 * @returns The worker information.
1573 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1574 return this.workerNodes
[workerNodeKey
].info
1578 * Adds the given worker in the pool worker nodes.
1580 * @param worker - The worker.
1581 * @returns The added worker node key.
1582 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1584 private addWorkerNode (worker
: Worker
): number {
1585 const workerNode
= new WorkerNode
<Worker
, Data
>(
1587 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1589 // Flag the worker node as ready at pool startup.
1590 if (this.starting
) {
1591 workerNode
.info
.ready
= true
1593 this.workerNodes
.push(workerNode
)
1594 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1595 if (workerNodeKey
=== -1) {
1596 throw new Error('Worker added not found in worker nodes')
1598 return workerNodeKey
1602 * Removes the given worker from the pool worker nodes.
1604 * @param worker - The worker.
1606 private removeWorkerNode (worker
: Worker
): void {
1607 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1608 if (workerNodeKey
!== -1) {
1609 this.workerNodes
.splice(workerNodeKey
, 1)
1610 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1615 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1617 this.opts
.enableTasksQueue
=== true &&
1618 this.workerNodes
[workerNodeKey
].hasBackPressure()
1622 private hasBackPressure (): boolean {
1624 this.opts
.enableTasksQueue
=== true &&
1625 this.workerNodes
.findIndex(
1626 workerNode
=> !workerNode
.hasBackPressure()
1632 * Executes the given task on the worker given its worker node key.
1634 * @param workerNodeKey - The worker node key.
1635 * @param task - The task to execute.
1637 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1638 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1639 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1640 this.checkAndEmitTaskExecutionEvents()
1643 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1644 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1645 this.checkAndEmitTaskQueuingEvents()
1646 return tasksQueueSize
1649 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1650 return this.workerNodes
[workerNodeKey
].dequeueTask()
1653 private tasksQueueSize (workerNodeKey
: number): number {
1654 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1657 protected flushTasksQueue (workerNodeKey
: number): void {
1658 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1661 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1664 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1667 private flushTasksQueues (): void {
1668 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1669 this.flushTasksQueue(workerNodeKey
)