1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
6 PromiseResponseWrapper
,
8 } from
'../utility-types'
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
21 import { KillBehaviors
} from
'../worker/worker-options'
22 import type { TaskFunction
} from
'../worker/task-functions'
31 type TasksQueueOptions
41 type MeasurementStatisticsRequirements
,
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
48 import { version
} from
'./version'
49 import { WorkerNode
} from
'./worker-node'
52 checkValidTasksQueueOptions
,
53 checkValidWorkerChoiceStrategy
,
54 updateMeasurementStatistics
58 * Base class that implements some shared logic for all poolifier pools.
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
64 export abstract class AbstractPool
<
65 Worker
extends IWorker
,
68 > implements IPool
<Worker
, Data
, Response
> {
70 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
73 public readonly emitter
?: PoolEmitter
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
82 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
83 new Map
<string, PromiseResponseWrapper
<Response
>>()
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
88 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
95 * Dynamic pool maximum size property placeholder.
97 protected readonly max
?: number
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
104 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
107 * Whether the pool is started or not.
109 private started
: boolean
111 * Whether the pool is starting or not.
113 private starting
: boolean
115 * The start timestamp of the pool.
117 private readonly startTimestamp
120 * Constructs a new poolifier pool.
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
127 protected readonly numberOfWorkers
: number,
128 protected readonly filePath
: string,
129 protected readonly opts
: PoolOptions
<Worker
>
131 if (!this.isMain()) {
133 'Cannot start a pool from a worker with the same type as the pool'
136 checkFilePath(this.filePath
)
137 this.checkNumberOfWorkers(this.numberOfWorkers
)
138 this.checkPoolOptions(this.opts
)
140 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
141 this.executeTask
= this.executeTask
.bind(this)
142 this.enqueueTask
= this.enqueueTask
.bind(this)
144 if (this.opts
.enableEvents
=== true) {
145 this.emitter
= new PoolEmitter()
147 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
153 this.opts
.workerChoiceStrategy
,
154 this.opts
.workerChoiceStrategyOptions
159 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
162 this.starting
= false
163 if (this.opts
.startWorkers
=== true) {
167 this.startTimestamp
= performance
.now()
170 private checkNumberOfWorkers (numberOfWorkers
: number): void {
171 if (numberOfWorkers
== null) {
173 'Cannot instantiate a pool without specifying the number of workers'
175 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
177 'Cannot instantiate a pool with a non safe integer number of workers'
179 } else if (numberOfWorkers
< 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
183 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
188 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
189 if (isPlainObject(opts
)) {
190 this.opts
.startWorkers
= opts
.startWorkers
?? true
191 checkValidWorkerChoiceStrategy(
192 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
194 this.opts
.workerChoiceStrategy
=
195 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
199 this.opts
.workerChoiceStrategyOptions
= {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
201 ...opts
.workerChoiceStrategyOptions
203 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
204 this.opts
.enableEvents
= opts
.enableEvents
?? true
205 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
206 if (this.opts
.enableTasksQueue
) {
207 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
208 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
209 opts
.tasksQueueOptions
as TasksQueueOptions
213 throw new TypeError('Invalid pool options: must be a plain object')
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
221 workerChoiceStrategyOptions
!= null &&
222 !isPlainObject(workerChoiceStrategyOptions
)
225 'Invalid worker choice strategy options: must be a plain object'
229 workerChoiceStrategyOptions
?.retries
!= null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
233 'Invalid worker choice strategy options: retries must be an integer'
237 workerChoiceStrategyOptions
?.retries
!= null &&
238 workerChoiceStrategyOptions
.retries
< 0
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
245 workerChoiceStrategyOptions
?.weights
!= null &&
246 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
249 'Invalid worker choice strategy options: must have a weight for each worker node'
253 workerChoiceStrategyOptions
?.measurement
!= null &&
254 !Object.values(Measurements
).includes(
255 workerChoiceStrategyOptions
.measurement
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
265 public get
info (): PoolInfo
{
270 started
: this.started
,
272 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
273 minSize
: this.minSize
,
274 maxSize
: this.maxSize
,
275 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
276 .runTime
.aggregate
&&
277 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
278 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
279 workerNodes
: this.workerNodes
.length
,
280 idleWorkerNodes
: this.workerNodes
.reduce(
281 (accumulator
, workerNode
) =>
282 workerNode
.usage
.tasks
.executing
=== 0
287 busyWorkerNodes
: this.workerNodes
.reduce(
288 (accumulator
, workerNode
) =>
289 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
292 executedTasks
: this.workerNodes
.reduce(
293 (accumulator
, workerNode
) =>
294 accumulator
+ workerNode
.usage
.tasks
.executed
,
297 executingTasks
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 accumulator
+ workerNode
.usage
.tasks
.executing
,
302 ...(this.opts
.enableTasksQueue
=== true && {
303 queuedTasks
: this.workerNodes
.reduce(
304 (accumulator
, workerNode
) =>
305 accumulator
+ workerNode
.usage
.tasks
.queued
,
309 ...(this.opts
.enableTasksQueue
=== true && {
310 maxQueuedTasks
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
316 ...(this.opts
.enableTasksQueue
=== true && {
317 backPressure
: this.hasBackPressure()
319 ...(this.opts
.enableTasksQueue
=== true && {
320 stolenTasks
: this.workerNodes
.reduce(
321 (accumulator
, workerNode
) =>
322 accumulator
+ workerNode
.usage
.tasks
.stolen
,
326 failedTasks
: this.workerNodes
.reduce(
327 (accumulator
, workerNode
) =>
328 accumulator
+ workerNode
.usage
.tasks
.failed
,
331 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
332 .runTime
.aggregate
&& {
336 ...this.workerNodes
.map(
337 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
343 ...this.workerNodes
.map(
344 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
348 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
349 .runTime
.average
&& {
352 this.workerNodes
.reduce
<number[]>(
353 (accumulator
, workerNode
) =>
354 accumulator
.concat(workerNode
.usage
.runTime
.history
),
360 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
364 this.workerNodes
.reduce
<number[]>(
365 (accumulator
, workerNode
) =>
366 accumulator
.concat(workerNode
.usage
.runTime
.history
),
374 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
375 .waitTime
.aggregate
&& {
379 ...this.workerNodes
.map(
380 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
386 ...this.workerNodes
.map(
387 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
391 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
392 .waitTime
.average
&& {
395 this.workerNodes
.reduce
<number[]>(
396 (accumulator
, workerNode
) =>
397 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
403 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
404 .waitTime
.median
&& {
407 this.workerNodes
.reduce
<number[]>(
408 (accumulator
, workerNode
) =>
409 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
421 * The pool readiness boolean status.
423 private get
ready (): boolean {
425 this.workerNodes
.reduce(
426 (accumulator
, workerNode
) =>
427 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
436 * The approximate pool utilization.
438 * @returns The pool utilization.
440 private get
utilization (): number {
441 const poolTimeCapacity
=
442 (performance
.now() - this.startTimestamp
) * this.maxSize
443 const totalTasksRunTime
= this.workerNodes
.reduce(
444 (accumulator
, workerNode
) =>
445 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
448 const totalTasksWaitTime
= this.workerNodes
.reduce(
449 (accumulator
, workerNode
) =>
450 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
453 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
459 * If it is `'dynamic'`, it provides the `max` property.
461 protected abstract get
type (): PoolType
466 protected abstract get
worker (): WorkerType
469 * The pool minimum size.
471 protected get
minSize (): number {
472 return this.numberOfWorkers
476 * The pool maximum size.
478 protected get
maxSize (): number {
479 return this.max
?? this.numberOfWorkers
483 * Checks if the worker id sent in the received message from a worker is valid.
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
488 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
489 if (message
.workerId
== null) {
490 throw new Error('Worker message received without worker id')
492 message
.workerId
!= null &&
493 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
496 `Worker message received from unknown worker '${message.workerId}'`
502 * Gets the given worker its worker node key.
504 * @param worker - The worker.
505 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
507 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
508 return this.workerNodes
.findIndex(
509 workerNode
=> workerNode
.worker
=== worker
514 * Gets the worker node key given its worker id.
516 * @param workerId - The worker id.
517 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
519 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
520 return this.workerNodes
.findIndex(
521 workerNode
=> workerNode
.info
.id
=== workerId
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy
: WorkerChoiceStrategy
,
528 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
530 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
531 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
532 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
533 this.opts
.workerChoiceStrategy
535 if (workerChoiceStrategyOptions
!= null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
538 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
539 workerNode
.resetUsage()
540 this.sendStatisticsMessageToWorker(workerNodeKey
)
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
549 this.opts
.workerChoiceStrategyOptions
= {
550 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
551 ...workerChoiceStrategyOptions
553 this.workerChoiceStrategyContext
.setOptions(
554 this.opts
.workerChoiceStrategyOptions
559 public enableTasksQueue (
561 tasksQueueOptions
?: TasksQueueOptions
563 if (this.opts
.enableTasksQueue
=== true && !enable
) {
564 this.unsetTaskStealing()
565 this.unsetTasksStealingOnBackPressure()
566 this.flushTasksQueues()
568 this.opts
.enableTasksQueue
= enable
569 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
573 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
574 if (this.opts
.enableTasksQueue
=== true) {
575 checkValidTasksQueueOptions(tasksQueueOptions
)
576 this.opts
.tasksQueueOptions
=
577 this.buildTasksQueueOptions(tasksQueueOptions
)
578 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
579 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
580 this.setTaskStealing()
582 this.unsetTaskStealing()
584 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
585 this.setTasksStealingOnBackPressure()
587 this.unsetTasksStealingOnBackPressure()
589 } else if (this.opts
.tasksQueueOptions
!= null) {
590 delete this.opts
.tasksQueueOptions
594 private buildTasksQueueOptions (
595 tasksQueueOptions
: TasksQueueOptions
596 ): TasksQueueOptions
{
599 size
: Math.pow(this.maxSize
, 2),
602 tasksStealingOnBackPressure
: true
608 private setTasksQueueSize (size
: number): void {
609 for (const workerNode
of this.workerNodes
) {
610 workerNode
.tasksQueueBackPressureSize
= size
614 private setTaskStealing (): void {
615 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
616 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
617 this.taskStealingOnEmptyQueue
.bind(this)
621 private unsetTaskStealing (): void {
622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
623 delete this.workerNodes
[workerNodeKey
].onEmptyQueue
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 this.workerNodes
[workerNodeKey
].onBackPressure
=
630 this.tasksStealingOnBackPressure
.bind(this)
634 private unsetTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
636 delete this.workerNodes
[workerNodeKey
].onBackPressure
641 * Whether the pool is full or not.
643 * The pool filling boolean status.
645 protected get
full (): boolean {
646 return this.workerNodes
.length
>= this.maxSize
650 * Whether the pool is busy or not.
652 * The pool busyness boolean status.
654 protected abstract get
busy (): boolean
657 * Whether worker nodes are executing concurrently their tasks quota or not.
659 * @returns Worker nodes busyness boolean status.
661 protected internalBusy (): boolean {
662 if (this.opts
.enableTasksQueue
=== true) {
664 this.workerNodes
.findIndex(
666 workerNode
.info
.ready
&&
667 workerNode
.usage
.tasks
.executing
<
668 (this.opts
.tasksQueueOptions
?.concurrency
as number)
673 this.workerNodes
.findIndex(
675 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
680 private async sendTaskFunctionOperationToWorker (
681 workerNodeKey
: number,
682 message
: MessageValue
<Data
>
683 ): Promise
<boolean> {
684 return await new Promise
<boolean>((resolve
, reject
) => {
685 const taskFunctionOperationListener
= (
686 message
: MessageValue
<Response
>
688 this.checkMessageWorkerId(message
)
689 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
691 message
.taskFunctionOperationStatus
!= null &&
692 message
.workerId
=== workerId
694 if (message
.taskFunctionOperationStatus
) {
696 } else if (!message
.taskFunctionOperationStatus
) {
699 `Task function operation '${
700 message.taskFunctionOperation as string
701 }' failed on worker ${message.workerId} with error: '${
702 message.workerError?.message as string
707 this.deregisterWorkerMessageListener(
708 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
709 taskFunctionOperationListener
713 this.registerWorkerMessageListener(
715 taskFunctionOperationListener
717 this.sendToWorker(workerNodeKey
, message
)
721 private async sendTaskFunctionOperationToWorkers (
722 message
: MessageValue
<Data
>
723 ): Promise
<boolean> {
724 return await new Promise
<boolean>((resolve
, reject
) => {
725 const responsesReceived
= new Array<MessageValue
<Response
>>()
726 const taskFunctionOperationsListener
= (
727 message
: MessageValue
<Response
>
729 this.checkMessageWorkerId(message
)
730 if (message
.taskFunctionOperationStatus
!= null) {
731 responsesReceived
.push(message
)
732 if (responsesReceived
.length
=== this.workerNodes
.length
) {
734 responsesReceived
.every(
735 message
=> message
.taskFunctionOperationStatus
=== true
740 responsesReceived
.some(
741 message
=> message
.taskFunctionOperationStatus
=== false
744 const errorResponse
= responsesReceived
.find(
745 response
=> response
.taskFunctionOperationStatus
=== false
749 `Task function operation '${
750 message.taskFunctionOperation as string
751 }' failed on worker ${
752 errorResponse?.workerId as number
754 errorResponse?.workerError?.message as string
759 this.deregisterWorkerMessageListener(
760 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
761 taskFunctionOperationsListener
766 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
767 this.registerWorkerMessageListener(
769 taskFunctionOperationsListener
771 this.sendToWorker(workerNodeKey
, message
)
777 public hasTaskFunction (name
: string): boolean {
778 for (const workerNode
of this.workerNodes
) {
780 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
781 workerNode
.info
.taskFunctionNames
.includes(name
)
790 public async addTaskFunction (
792 fn
: TaskFunction
<Data
, Response
>
793 ): Promise
<boolean> {
794 if (typeof name
!== 'string') {
795 throw new TypeError('name argument must be a string')
797 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
798 throw new TypeError('name argument must not be an empty string')
800 if (typeof fn
!== 'function') {
801 throw new TypeError('fn argument must be a function')
803 const opResult
= await this.sendTaskFunctionOperationToWorkers({
804 taskFunctionOperation
: 'add',
805 taskFunctionName
: name
,
806 taskFunction
: fn
.toString()
808 this.taskFunctions
.set(name
, fn
)
813 public async removeTaskFunction (name
: string): Promise
<boolean> {
814 if (!this.taskFunctions
.has(name
)) {
816 'Cannot remove a task function not handled on the pool side'
819 const opResult
= await this.sendTaskFunctionOperationToWorkers({
820 taskFunctionOperation
: 'remove',
821 taskFunctionName
: name
823 this.deleteTaskFunctionWorkerUsages(name
)
824 this.taskFunctions
.delete(name
)
829 public listTaskFunctionNames (): string[] {
830 for (const workerNode
of this.workerNodes
) {
832 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
833 workerNode
.info
.taskFunctionNames
.length
> 0
835 return workerNode
.info
.taskFunctionNames
842 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
843 return await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation
: 'default',
845 taskFunctionName
: name
849 private deleteTaskFunctionWorkerUsages (name
: string): void {
850 for (const workerNode
of this.workerNodes
) {
851 workerNode
.deleteTaskFunctionWorkerUsage(name
)
855 private shallExecuteTask (workerNodeKey
: number): boolean {
857 this.tasksQueueSize(workerNodeKey
) === 0 &&
858 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
859 (this.opts
.tasksQueueOptions
?.concurrency
as number)
864 public async execute (
867 transferList
?: TransferListItem
[]
868 ): Promise
<Response
> {
869 return await new Promise
<Response
>((resolve
, reject
) => {
871 reject(new Error('Cannot execute a task on not started pool'))
874 if (name
!= null && typeof name
!== 'string') {
875 reject(new TypeError('name argument must be a string'))
880 typeof name
=== 'string' &&
881 name
.trim().length
=== 0
883 reject(new TypeError('name argument must not be an empty string'))
886 if (transferList
!= null && !Array.isArray(transferList
)) {
887 reject(new TypeError('transferList argument must be an array'))
890 const timestamp
= performance
.now()
891 const workerNodeKey
= this.chooseWorkerNode()
892 const task
: Task
<Data
> = {
893 name
: name
?? DEFAULT_TASK_NAME
,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data
: data
?? ({} as Data
),
900 this.promiseResponseMap
.set(task
.taskId
as string, {
906 this.opts
.enableTasksQueue
=== false ||
907 (this.opts
.enableTasksQueue
=== true &&
908 this.shallExecuteTask(workerNodeKey
))
910 this.executeTask(workerNodeKey
, task
)
912 this.enqueueTask(workerNodeKey
, task
)
918 public start (): void {
921 this.workerNodes
.reduce(
922 (accumulator
, workerNode
) =>
923 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
925 ) < this.numberOfWorkers
927 this.createAndSetupWorkerNode()
929 this.starting
= false
934 public async destroy (): Promise
<void> {
936 this.workerNodes
.map(async (_
, workerNodeKey
) => {
937 await this.destroyWorkerNode(workerNodeKey
)
940 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
944 protected async sendKillMessageToWorker (
945 workerNodeKey
: number
947 await new Promise
<void>((resolve
, reject
) => {
948 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
949 this.checkMessageWorkerId(message
)
950 if (message
.kill
=== 'success') {
952 } else if (message
.kill
=== 'failure') {
955 `Kill message handling failed on worker ${
956 message.workerId as number
962 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
963 this.sendToWorker(workerNodeKey
, { kill
: true })
968 * Terminates the worker node given its worker node key.
970 * @param workerNodeKey - The worker node key.
972 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
975 * Setup hook to execute code before worker nodes are created in the abstract constructor.
980 protected setupHook (): void {
981 /* Intentionally empty */
985 * Should return whether the worker is the main worker or not.
987 protected abstract isMain (): boolean
990 * Hook executed before the worker task execution.
993 * @param workerNodeKey - The worker node key.
994 * @param task - The task to execute.
996 protected beforeTaskExecutionHook (
997 workerNodeKey
: number,
1000 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1001 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1002 ++workerUsage
.tasks
.executing
1003 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1006 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1007 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1011 const taskFunctionWorkerUsage
= this.workerNodes
[
1013 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1014 ++taskFunctionWorkerUsage
.tasks
.executing
1015 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1020 * Hook executed after the worker task execution.
1021 * Can be overridden.
1023 * @param workerNodeKey - The worker node key.
1024 * @param message - The received message.
1026 protected afterTaskExecutionHook (
1027 workerNodeKey
: number,
1028 message
: MessageValue
<Response
>
1030 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1031 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1032 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1033 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1034 this.updateEluWorkerUsage(workerUsage
, message
)
1037 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1038 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1039 message
.taskPerformance
?.name
as string
1042 const taskFunctionWorkerUsage
= this.workerNodes
[
1044 ].getTaskFunctionWorkerUsage(
1045 message
.taskPerformance
?.name
as string
1047 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1048 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1049 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1054 * Whether the worker node shall update its task function worker usage or not.
1056 * @param workerNodeKey - The worker node key.
1057 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1059 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1060 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1062 workerInfo
!= null &&
1063 Array.isArray(workerInfo
.taskFunctionNames
) &&
1064 workerInfo
.taskFunctionNames
.length
> 2
1068 private updateTaskStatisticsWorkerUsage (
1069 workerUsage
: WorkerUsage
,
1070 message
: MessageValue
<Response
>
1072 const workerTaskStatistics
= workerUsage
.tasks
1074 workerTaskStatistics
.executing
!= null &&
1075 workerTaskStatistics
.executing
> 0
1077 --workerTaskStatistics
.executing
1079 if (message
.workerError
== null) {
1080 ++workerTaskStatistics
.executed
1082 ++workerTaskStatistics
.failed
1086 private updateRunTimeWorkerUsage (
1087 workerUsage
: WorkerUsage
,
1088 message
: MessageValue
<Response
>
1090 if (message
.workerError
!= null) {
1093 updateMeasurementStatistics(
1094 workerUsage
.runTime
,
1095 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1096 message
.taskPerformance
?.runTime
?? 0
1100 private updateWaitTimeWorkerUsage (
1101 workerUsage
: WorkerUsage
,
1104 const timestamp
= performance
.now()
1105 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1106 updateMeasurementStatistics(
1107 workerUsage
.waitTime
,
1108 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1113 private updateEluWorkerUsage (
1114 workerUsage
: WorkerUsage
,
1115 message
: MessageValue
<Response
>
1117 if (message
.workerError
!= null) {
1120 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1121 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1122 updateMeasurementStatistics(
1123 workerUsage
.elu
.active
,
1124 eluTaskStatisticsRequirements
,
1125 message
.taskPerformance
?.elu
?.active
?? 0
1127 updateMeasurementStatistics(
1128 workerUsage
.elu
.idle
,
1129 eluTaskStatisticsRequirements
,
1130 message
.taskPerformance
?.elu
?.idle
?? 0
1132 if (eluTaskStatisticsRequirements
.aggregate
) {
1133 if (message
.taskPerformance
?.elu
!= null) {
1134 if (workerUsage
.elu
.utilization
!= null) {
1135 workerUsage
.elu
.utilization
=
1136 (workerUsage
.elu
.utilization
+
1137 message
.taskPerformance
.elu
.utilization
) /
1140 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1147 * Chooses a worker node for the next task.
1149 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1151 * @returns The chosen worker node key
1153 private chooseWorkerNode (): number {
1154 if (this.shallCreateDynamicWorker()) {
1155 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1157 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1159 return workerNodeKey
1162 return this.workerChoiceStrategyContext
.execute()
1166 * Conditions for dynamic worker creation.
1168 * @returns Whether to create a dynamic worker or not.
1170 private shallCreateDynamicWorker (): boolean {
1171 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1175 * Sends a message to worker given its worker node key.
1177 * @param workerNodeKey - The worker node key.
1178 * @param message - The message.
1179 * @param transferList - The optional array of transferable objects.
1181 protected abstract sendToWorker (
1182 workerNodeKey
: number,
1183 message
: MessageValue
<Data
>,
1184 transferList
?: TransferListItem
[]
1188 * Creates a new worker.
1190 * @returns Newly created worker.
1192 protected abstract createWorker (): Worker
1195 * Creates a new, completely set up worker node.
1197 * @returns New, completely set up worker node key.
1199 protected createAndSetupWorkerNode (): number {
1200 const worker
= this.createWorker()
1202 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1203 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1204 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1205 worker
.on('error', error
=> {
1206 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1207 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1208 workerInfo
.ready
= false
1209 this.workerNodes
[workerNodeKey
].closeChannel()
1210 this.emitter
?.emit(PoolEvents
.error
, error
)
1214 this.opts
.restartWorkerOnError
=== true
1216 if (workerInfo
.dynamic
) {
1217 this.createAndSetupDynamicWorkerNode()
1219 this.createAndSetupWorkerNode()
1222 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1223 this.redistributeQueuedTasks(workerNodeKey
)
1226 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1227 worker
.once('exit', () => {
1228 this.removeWorkerNode(worker
)
1231 const workerNodeKey
= this.addWorkerNode(worker
)
1233 this.afterWorkerNodeSetup(workerNodeKey
)
1235 return workerNodeKey
1239 * Creates a new, completely set up dynamic worker node.
1241 * @returns New, completely set up dynamic worker node key.
1243 protected createAndSetupDynamicWorkerNode (): number {
1244 const workerNodeKey
= this.createAndSetupWorkerNode()
1245 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1246 this.checkMessageWorkerId(message
)
1247 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1250 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1251 // Kill message received from worker
1253 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1254 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1255 ((this.opts
.enableTasksQueue
=== false &&
1256 workerUsage
.tasks
.executing
=== 0) ||
1257 (this.opts
.enableTasksQueue
=== true &&
1258 workerUsage
.tasks
.executing
=== 0 &&
1259 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1261 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1262 this.emitter
?.emit(PoolEvents
.error
, error
)
1266 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1267 this.sendToWorker(workerNodeKey
, {
1270 if (this.taskFunctions
.size
> 0) {
1271 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1272 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1273 taskFunctionOperation
: 'add',
1275 taskFunction
: taskFunction
.toString()
1277 this.emitter
?.emit(PoolEvents
.error
, error
)
1281 workerInfo
.dynamic
= true
1283 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1284 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1286 workerInfo
.ready
= true
1288 this.checkAndEmitDynamicWorkerCreationEvents()
1289 return workerNodeKey
1293 * Registers a listener callback on the worker given its worker node key.
1295 * @param workerNodeKey - The worker node key.
1296 * @param listener - The message listener callback.
1298 protected abstract registerWorkerMessageListener
<
1299 Message
extends Data
| Response
1301 workerNodeKey
: number,
1302 listener
: (message
: MessageValue
<Message
>) => void
1306 * Registers once a listener callback on the worker given its worker node key.
1308 * @param workerNodeKey - The worker node key.
1309 * @param listener - The message listener callback.
1311 protected abstract registerOnceWorkerMessageListener
<
1312 Message
extends Data
| Response
1314 workerNodeKey
: number,
1315 listener
: (message
: MessageValue
<Message
>) => void
1319 * Deregisters a listener callback on the worker given its worker node key.
1321 * @param workerNodeKey - The worker node key.
1322 * @param listener - The message listener callback.
1324 protected abstract deregisterWorkerMessageListener
<
1325 Message
extends Data
| Response
1327 workerNodeKey
: number,
1328 listener
: (message
: MessageValue
<Message
>) => void
1332 * Method hooked up after a worker node has been newly created.
1333 * Can be overridden.
1335 * @param workerNodeKey - The newly created worker node key.
1337 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1338 // Listen to worker messages.
1339 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1340 // Send the startup message to worker.
1341 this.sendStartupMessageToWorker(workerNodeKey
)
1342 // Send the statistics message to worker.
1343 this.sendStatisticsMessageToWorker(workerNodeKey
)
1344 if (this.opts
.enableTasksQueue
=== true) {
1345 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1346 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1347 this.taskStealingOnEmptyQueue
.bind(this)
1349 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1350 this.workerNodes
[workerNodeKey
].onBackPressure
=
1351 this.tasksStealingOnBackPressure
.bind(this)
1357 * Sends the startup message to worker given its worker node key.
1359 * @param workerNodeKey - The worker node key.
1361 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1364 * Sends the statistics message to worker given its worker node key.
1366 * @param workerNodeKey - The worker node key.
1368 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1369 this.sendToWorker(workerNodeKey
, {
1372 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1374 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1380 private redistributeQueuedTasks (workerNodeKey
: number): void {
1381 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1382 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1383 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1384 return workerNode
.info
.ready
&&
1385 workerNode
.usage
.tasks
.queued
<
1386 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1392 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1393 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1394 this.executeTask(destinationWorkerNodeKey
, task
)
1396 this.enqueueTask(destinationWorkerNodeKey
, task
)
1401 private updateTaskStolenStatisticsWorkerUsage (
1402 workerNodeKey
: number,
1405 const workerNode
= this.workerNodes
[workerNodeKey
]
1406 if (workerNode
?.usage
!= null) {
1407 ++workerNode
.usage
.tasks
.stolen
1410 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1411 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1413 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1416 ++taskFunctionWorkerUsage
.tasks
.stolen
1420 private taskStealingOnEmptyQueue (workerId
: number): void {
1421 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1422 const workerNodes
= this.workerNodes
1425 (workerNodeA
, workerNodeB
) =>
1426 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1428 const sourceWorkerNode
= workerNodes
.find(
1430 workerNode
.info
.ready
&&
1431 workerNode
.info
.id
!== workerId
&&
1432 workerNode
.usage
.tasks
.queued
> 0
1434 if (sourceWorkerNode
!= null) {
1435 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1436 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1437 this.executeTask(destinationWorkerNodeKey
, task
)
1439 this.enqueueTask(destinationWorkerNodeKey
, task
)
1441 this.updateTaskStolenStatisticsWorkerUsage(
1442 destinationWorkerNodeKey
,
1448 private tasksStealingOnBackPressure (workerId
: number): void {
1449 const sizeOffset
= 1
1450 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1453 const sourceWorkerNode
=
1454 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1455 const workerNodes
= this.workerNodes
1458 (workerNodeA
, workerNodeB
) =>
1459 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1461 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1463 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1464 workerNode
.info
.ready
&&
1465 workerNode
.info
.id
!== workerId
&&
1466 workerNode
.usage
.tasks
.queued
<
1467 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1469 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1470 if (this.shallExecuteTask(workerNodeKey
)) {
1471 this.executeTask(workerNodeKey
, task
)
1473 this.enqueueTask(workerNodeKey
, task
)
1475 this.updateTaskStolenStatisticsWorkerUsage(
1484 * This method is the listener registered for each worker message.
1486 * @returns The listener function to execute when a message is received from a worker.
1488 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1490 this.checkMessageWorkerId(message
)
1491 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1492 // Worker ready response received from worker
1493 this.handleWorkerReadyResponse(message
)
1494 } else if (message
.taskId
!= null) {
1495 // Task execution response received from worker
1496 this.handleTaskExecutionResponse(message
)
1497 } else if (message
.taskFunctionNames
!= null) {
1498 // Task function names message received from worker
1500 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1501 ).taskFunctionNames
= message
.taskFunctionNames
1506 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1507 if (message
.ready
=== false) {
1509 `Worker ${message.workerId as number} failed to initialize`
1512 const workerInfo
= this.getWorkerInfo(
1513 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1515 workerInfo
.ready
= message
.ready
as boolean
1516 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1518 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1522 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1523 const { taskId
, workerError
, data
} = message
1524 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1525 if (promiseResponse
!= null) {
1526 if (workerError
!= null) {
1527 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1528 promiseResponse
.reject(workerError
.message
)
1530 promiseResponse
.resolve(data
as Response
)
1532 const workerNodeKey
= promiseResponse
.workerNodeKey
1533 this.afterTaskExecutionHook(workerNodeKey
, message
)
1534 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1535 this.promiseResponseMap
.delete(taskId
as string)
1537 this.opts
.enableTasksQueue
=== true &&
1538 this.tasksQueueSize(workerNodeKey
) > 0 &&
1539 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1540 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1544 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1550 private checkAndEmitTaskExecutionEvents (): void {
1552 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1556 private checkAndEmitTaskQueuingEvents (): void {
1557 if (this.hasBackPressure()) {
1558 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1562 private checkAndEmitDynamicWorkerCreationEvents (): void {
1563 if (this.type === PoolTypes
.dynamic
) {
1565 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1571 * Gets the worker information given its worker node key.
1573 * @param workerNodeKey - The worker node key.
1574 * @returns The worker information.
1576 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1577 return this.workerNodes
[workerNodeKey
].info
1581 * Adds the given worker in the pool worker nodes.
1583 * @param worker - The worker.
1584 * @returns The added worker node key.
1585 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1587 private addWorkerNode (worker
: Worker
): number {
1588 const workerNode
= new WorkerNode
<Worker
, Data
>(
1590 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1592 // Flag the worker node as ready at pool startup.
1593 if (this.starting
) {
1594 workerNode
.info
.ready
= true
1596 this.workerNodes
.push(workerNode
)
1597 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1598 if (workerNodeKey
=== -1) {
1599 throw new Error('Worker added not found in worker nodes')
1601 return workerNodeKey
1605 * Removes the given worker from the pool worker nodes.
1607 * @param worker - The worker.
1609 private removeWorkerNode (worker
: Worker
): void {
1610 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1611 if (workerNodeKey
!== -1) {
1612 this.workerNodes
.splice(workerNodeKey
, 1)
1613 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1618 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1620 this.opts
.enableTasksQueue
=== true &&
1621 this.workerNodes
[workerNodeKey
].hasBackPressure()
1625 private hasBackPressure (): boolean {
1627 this.opts
.enableTasksQueue
=== true &&
1628 this.workerNodes
.findIndex(
1629 workerNode
=> !workerNode
.hasBackPressure()
1635 * Executes the given task on the worker given its worker node key.
1637 * @param workerNodeKey - The worker node key.
1638 * @param task - The task to execute.
1640 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1641 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1642 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1643 this.checkAndEmitTaskExecutionEvents()
1646 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1647 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1648 this.checkAndEmitTaskQueuingEvents()
1649 return tasksQueueSize
1652 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1653 return this.workerNodes
[workerNodeKey
].dequeueTask()
1656 private tasksQueueSize (workerNodeKey
: number): number {
1657 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1660 protected flushTasksQueue (workerNodeKey
: number): void {
1661 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1664 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1667 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1670 private flushTasksQueues (): void {
1671 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1672 this.flushTasksQueue(workerNodeKey
)