1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { type EventEmitter
, EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
22 import { KillBehaviors
} from
'../worker/worker-options'
23 import type { TaskFunction
} from
'../worker/task-functions'
31 type TasksQueueOptions
41 type MeasurementStatisticsRequirements
,
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
48 import { version
} from
'./version'
49 import { WorkerNode
} from
'./worker-node'
52 checkValidTasksQueueOptions
,
53 checkValidWorkerChoiceStrategy
,
54 updateMeasurementStatistics
58 * Base class that implements some shared logic for all poolifier pools.
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
64 export abstract class AbstractPool
<
65 Worker
extends IWorker
,
68 > implements IPool
<Worker
, Data
, Response
> {
70 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
73 public emitter
?: EventEmitter
| EventEmitterAsyncResource
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
82 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
83 new Map
<string, PromiseResponseWrapper
<Response
>>()
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
88 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
95 * Dynamic pool maximum size property placeholder.
97 protected readonly max
?: number
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
104 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
107 * Whether the pool is started or not.
109 private started
: boolean
111 * Whether the pool is starting or not.
113 private starting
: boolean
115 * The start timestamp of the pool.
117 private readonly startTimestamp
120 * Constructs a new poolifier pool.
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
127 protected readonly numberOfWorkers
: number,
128 protected readonly filePath
: string,
129 protected readonly opts
: PoolOptions
<Worker
>
131 if (!this.isMain()) {
133 'Cannot start a pool from a worker with the same type as the pool'
136 checkFilePath(this.filePath
)
137 this.checkNumberOfWorkers(this.numberOfWorkers
)
138 this.checkPoolOptions(this.opts
)
140 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
141 this.executeTask
= this.executeTask
.bind(this)
142 this.enqueueTask
= this.enqueueTask
.bind(this)
144 if (this.opts
.enableEvents
=== true) {
145 this.initializeEventEmitter()
147 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
153 this.opts
.workerChoiceStrategy
,
154 this.opts
.workerChoiceStrategyOptions
159 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
162 this.starting
= false
163 if (this.opts
.startWorkers
=== true) {
167 this.startTimestamp
= performance
.now()
170 private checkNumberOfWorkers (numberOfWorkers
: number): void {
171 if (numberOfWorkers
== null) {
173 'Cannot instantiate a pool without specifying the number of workers'
175 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
177 'Cannot instantiate a pool with a non safe integer number of workers'
179 } else if (numberOfWorkers
< 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
183 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
188 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
189 if (isPlainObject(opts
)) {
190 this.opts
.startWorkers
= opts
.startWorkers
?? true
191 checkValidWorkerChoiceStrategy(
192 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
194 this.opts
.workerChoiceStrategy
=
195 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
199 this.opts
.workerChoiceStrategyOptions
= {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
201 ...opts
.workerChoiceStrategyOptions
203 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
204 this.opts
.enableEvents
= opts
.enableEvents
?? true
205 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
206 if (this.opts
.enableTasksQueue
) {
207 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
208 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
209 opts
.tasksQueueOptions
as TasksQueueOptions
213 throw new TypeError('Invalid pool options: must be a plain object')
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
221 workerChoiceStrategyOptions
!= null &&
222 !isPlainObject(workerChoiceStrategyOptions
)
225 'Invalid worker choice strategy options: must be a plain object'
229 workerChoiceStrategyOptions
?.retries
!= null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
233 'Invalid worker choice strategy options: retries must be an integer'
237 workerChoiceStrategyOptions
?.retries
!= null &&
238 workerChoiceStrategyOptions
.retries
< 0
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
245 workerChoiceStrategyOptions
?.weights
!= null &&
246 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
249 'Invalid worker choice strategy options: must have a weight for each worker node'
253 workerChoiceStrategyOptions
?.measurement
!= null &&
254 !Object.values(Measurements
).includes(
255 workerChoiceStrategyOptions
.measurement
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
264 private initializeEventEmitter (): void {
265 this.emitter
= new EventEmitterAsyncResource({
266 name
: `poolifier:${this.type}-${this.worker}-pool`
271 public get
info (): PoolInfo
{
276 started
: this.started
,
278 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
279 minSize
: this.minSize
,
280 maxSize
: this.maxSize
,
281 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
282 .runTime
.aggregate
&&
283 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
284 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
285 workerNodes
: this.workerNodes
.length
,
286 idleWorkerNodes
: this.workerNodes
.reduce(
287 (accumulator
, workerNode
) =>
288 workerNode
.usage
.tasks
.executing
=== 0
293 busyWorkerNodes
: this.workerNodes
.reduce(
294 (accumulator
, workerNode
) =>
295 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
298 executedTasks
: this.workerNodes
.reduce(
299 (accumulator
, workerNode
) =>
300 accumulator
+ workerNode
.usage
.tasks
.executed
,
303 executingTasks
: this.workerNodes
.reduce(
304 (accumulator
, workerNode
) =>
305 accumulator
+ workerNode
.usage
.tasks
.executing
,
308 ...(this.opts
.enableTasksQueue
=== true && {
309 queuedTasks
: this.workerNodes
.reduce(
310 (accumulator
, workerNode
) =>
311 accumulator
+ workerNode
.usage
.tasks
.queued
,
315 ...(this.opts
.enableTasksQueue
=== true && {
316 maxQueuedTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
322 ...(this.opts
.enableTasksQueue
=== true && {
323 backPressure
: this.hasBackPressure()
325 ...(this.opts
.enableTasksQueue
=== true && {
326 stolenTasks
: this.workerNodes
.reduce(
327 (accumulator
, workerNode
) =>
328 accumulator
+ workerNode
.usage
.tasks
.stolen
,
332 failedTasks
: this.workerNodes
.reduce(
333 (accumulator
, workerNode
) =>
334 accumulator
+ workerNode
.usage
.tasks
.failed
,
337 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
338 .runTime
.aggregate
&& {
342 ...this.workerNodes
.map(
343 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
349 ...this.workerNodes
.map(
350 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
354 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
355 .runTime
.average
&& {
358 this.workerNodes
.reduce
<number[]>(
359 (accumulator
, workerNode
) =>
360 accumulator
.concat(workerNode
.usage
.runTime
.history
),
366 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
370 this.workerNodes
.reduce
<number[]>(
371 (accumulator
, workerNode
) =>
372 accumulator
.concat(workerNode
.usage
.runTime
.history
),
380 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
381 .waitTime
.aggregate
&& {
385 ...this.workerNodes
.map(
386 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
392 ...this.workerNodes
.map(
393 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
397 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
398 .waitTime
.average
&& {
401 this.workerNodes
.reduce
<number[]>(
402 (accumulator
, workerNode
) =>
403 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
409 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
410 .waitTime
.median
&& {
413 this.workerNodes
.reduce
<number[]>(
414 (accumulator
, workerNode
) =>
415 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
427 * The pool readiness boolean status.
429 private get
ready (): boolean {
431 this.workerNodes
.reduce(
432 (accumulator
, workerNode
) =>
433 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
442 * The approximate pool utilization.
444 * @returns The pool utilization.
446 private get
utilization (): number {
447 const poolTimeCapacity
=
448 (performance
.now() - this.startTimestamp
) * this.maxSize
449 const totalTasksRunTime
= this.workerNodes
.reduce(
450 (accumulator
, workerNode
) =>
451 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
454 const totalTasksWaitTime
= this.workerNodes
.reduce(
455 (accumulator
, workerNode
) =>
456 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
459 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
465 * If it is `'dynamic'`, it provides the `max` property.
467 protected abstract get
type (): PoolType
472 protected abstract get
worker (): WorkerType
475 * The pool minimum size.
477 protected get
minSize (): number {
478 return this.numberOfWorkers
482 * The pool maximum size.
484 protected get
maxSize (): number {
485 return this.max
?? this.numberOfWorkers
489 * Checks if the worker id sent in the received message from a worker is valid.
491 * @param message - The received message.
492 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
494 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
495 if (message
.workerId
== null) {
496 throw new Error('Worker message received without worker id')
498 message
.workerId
!= null &&
499 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
502 `Worker message received from unknown worker '${message.workerId}'`
508 * Gets the given worker its worker node key.
510 * @param worker - The worker.
511 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
513 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
514 return this.workerNodes
.findIndex(
515 workerNode
=> workerNode
.worker
=== worker
520 * Gets the worker node key given its worker id.
522 * @param workerId - The worker id.
523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
525 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
526 return this.workerNodes
.findIndex(
527 workerNode
=> workerNode
.info
.id
=== workerId
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy
: WorkerChoiceStrategy
,
534 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
536 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
537 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
538 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
539 this.opts
.workerChoiceStrategy
541 if (workerChoiceStrategyOptions
!= null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
544 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
545 workerNode
.resetUsage()
546 this.sendStatisticsMessageToWorker(workerNodeKey
)
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 this.opts
.workerChoiceStrategyOptions
= {
556 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
557 ...workerChoiceStrategyOptions
559 this.workerChoiceStrategyContext
.setOptions(
560 this.opts
.workerChoiceStrategyOptions
565 public enableTasksQueue (
567 tasksQueueOptions
?: TasksQueueOptions
569 if (this.opts
.enableTasksQueue
=== true && !enable
) {
570 this.unsetTaskStealing()
571 this.unsetTasksStealingOnBackPressure()
572 this.flushTasksQueues()
574 this.opts
.enableTasksQueue
= enable
575 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
579 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
580 if (this.opts
.enableTasksQueue
=== true) {
581 checkValidTasksQueueOptions(tasksQueueOptions
)
582 this.opts
.tasksQueueOptions
=
583 this.buildTasksQueueOptions(tasksQueueOptions
)
584 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
585 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
586 this.setTaskStealing()
588 this.unsetTaskStealing()
590 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
591 this.setTasksStealingOnBackPressure()
593 this.unsetTasksStealingOnBackPressure()
595 } else if (this.opts
.tasksQueueOptions
!= null) {
596 delete this.opts
.tasksQueueOptions
600 private buildTasksQueueOptions (
601 tasksQueueOptions
: TasksQueueOptions
602 ): TasksQueueOptions
{
605 size
: Math.pow(this.maxSize
, 2),
608 tasksStealingOnBackPressure
: true
614 private setTasksQueueSize (size
: number): void {
615 for (const workerNode
of this.workerNodes
) {
616 workerNode
.tasksQueueBackPressureSize
= size
620 private setTaskStealing (): void {
621 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
622 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
623 this.taskStealingOnEmptyQueue
.bind(this)
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 delete this.workerNodes
[workerNodeKey
].onEmptyQueue
633 private setTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
635 this.workerNodes
[workerNodeKey
].onBackPressure
=
636 this.tasksStealingOnBackPressure
.bind(this)
640 private unsetTasksStealingOnBackPressure (): void {
641 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
642 delete this.workerNodes
[workerNodeKey
].onBackPressure
647 * Whether the pool is full or not.
649 * The pool filling boolean status.
651 protected get
full (): boolean {
652 return this.workerNodes
.length
>= this.maxSize
656 * Whether the pool is busy or not.
658 * The pool busyness boolean status.
660 protected abstract get
busy (): boolean
663 * Whether worker nodes are executing concurrently their tasks quota or not.
665 * @returns Worker nodes busyness boolean status.
667 protected internalBusy (): boolean {
668 if (this.opts
.enableTasksQueue
=== true) {
670 this.workerNodes
.findIndex(
672 workerNode
.info
.ready
&&
673 workerNode
.usage
.tasks
.executing
<
674 (this.opts
.tasksQueueOptions
?.concurrency
as number)
679 this.workerNodes
.findIndex(
681 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
686 private async sendTaskFunctionOperationToWorker (
687 workerNodeKey
: number,
688 message
: MessageValue
<Data
>
689 ): Promise
<boolean> {
690 return await new Promise
<boolean>((resolve
, reject
) => {
691 const taskFunctionOperationListener
= (
692 message
: MessageValue
<Response
>
694 this.checkMessageWorkerId(message
)
695 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
697 message
.taskFunctionOperationStatus
!= null &&
698 message
.workerId
=== workerId
700 if (message
.taskFunctionOperationStatus
) {
702 } else if (!message
.taskFunctionOperationStatus
) {
705 `Task function operation '${
706 message.taskFunctionOperation as string
707 }' failed on worker ${message.workerId} with error: '${
708 message.workerError?.message as string
713 this.deregisterWorkerMessageListener(
714 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
715 taskFunctionOperationListener
719 this.registerWorkerMessageListener(
721 taskFunctionOperationListener
723 this.sendToWorker(workerNodeKey
, message
)
727 private async sendTaskFunctionOperationToWorkers (
728 message
: MessageValue
<Data
>
729 ): Promise
<boolean> {
730 return await new Promise
<boolean>((resolve
, reject
) => {
731 const responsesReceived
= new Array<MessageValue
<Response
>>()
732 const taskFunctionOperationsListener
= (
733 message
: MessageValue
<Response
>
735 this.checkMessageWorkerId(message
)
736 if (message
.taskFunctionOperationStatus
!= null) {
737 responsesReceived
.push(message
)
738 if (responsesReceived
.length
=== this.workerNodes
.length
) {
740 responsesReceived
.every(
741 message
=> message
.taskFunctionOperationStatus
=== true
746 responsesReceived
.some(
747 message
=> message
.taskFunctionOperationStatus
=== false
750 const errorResponse
= responsesReceived
.find(
751 response
=> response
.taskFunctionOperationStatus
=== false
755 `Task function operation '${
756 message.taskFunctionOperation as string
757 }' failed on worker ${
758 errorResponse?.workerId as number
760 errorResponse?.workerError?.message as string
765 this.deregisterWorkerMessageListener(
766 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
767 taskFunctionOperationsListener
772 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
773 this.registerWorkerMessageListener(
775 taskFunctionOperationsListener
777 this.sendToWorker(workerNodeKey
, message
)
783 public hasTaskFunction (name
: string): boolean {
784 for (const workerNode
of this.workerNodes
) {
786 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
787 workerNode
.info
.taskFunctionNames
.includes(name
)
796 public async addTaskFunction (
798 fn
: TaskFunction
<Data
, Response
>
799 ): Promise
<boolean> {
800 if (typeof name
!== 'string') {
801 throw new TypeError('name argument must be a string')
803 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
804 throw new TypeError('name argument must not be an empty string')
806 if (typeof fn
!== 'function') {
807 throw new TypeError('fn argument must be a function')
809 const opResult
= await this.sendTaskFunctionOperationToWorkers({
810 taskFunctionOperation
: 'add',
811 taskFunctionName
: name
,
812 taskFunction
: fn
.toString()
814 this.taskFunctions
.set(name
, fn
)
819 public async removeTaskFunction (name
: string): Promise
<boolean> {
820 if (!this.taskFunctions
.has(name
)) {
822 'Cannot remove a task function not handled on the pool side'
825 const opResult
= await this.sendTaskFunctionOperationToWorkers({
826 taskFunctionOperation
: 'remove',
827 taskFunctionName
: name
829 this.deleteTaskFunctionWorkerUsages(name
)
830 this.taskFunctions
.delete(name
)
835 public listTaskFunctionNames (): string[] {
836 for (const workerNode
of this.workerNodes
) {
838 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
839 workerNode
.info
.taskFunctionNames
.length
> 0
841 return workerNode
.info
.taskFunctionNames
848 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
849 return await this.sendTaskFunctionOperationToWorkers({
850 taskFunctionOperation
: 'default',
851 taskFunctionName
: name
855 private deleteTaskFunctionWorkerUsages (name
: string): void {
856 for (const workerNode
of this.workerNodes
) {
857 workerNode
.deleteTaskFunctionWorkerUsage(name
)
861 private shallExecuteTask (workerNodeKey
: number): boolean {
863 this.tasksQueueSize(workerNodeKey
) === 0 &&
864 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
865 (this.opts
.tasksQueueOptions
?.concurrency
as number)
870 public async execute (
873 transferList
?: TransferListItem
[]
874 ): Promise
<Response
> {
875 return await new Promise
<Response
>((resolve
, reject
) => {
877 reject(new Error('Cannot execute a task on not started pool'))
880 if (name
!= null && typeof name
!== 'string') {
881 reject(new TypeError('name argument must be a string'))
886 typeof name
=== 'string' &&
887 name
.trim().length
=== 0
889 reject(new TypeError('name argument must not be an empty string'))
892 if (transferList
!= null && !Array.isArray(transferList
)) {
893 reject(new TypeError('transferList argument must be an array'))
896 const timestamp
= performance
.now()
897 const workerNodeKey
= this.chooseWorkerNode()
898 const task
: Task
<Data
> = {
899 name
: name
?? DEFAULT_TASK_NAME
,
900 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
901 data
: data
?? ({} as Data
),
906 this.promiseResponseMap
.set(task
.taskId
as string, {
912 this.opts
.enableTasksQueue
=== false ||
913 (this.opts
.enableTasksQueue
=== true &&
914 this.shallExecuteTask(workerNodeKey
))
916 this.executeTask(workerNodeKey
, task
)
918 this.enqueueTask(workerNodeKey
, task
)
924 public start (): void {
927 this.workerNodes
.reduce(
928 (accumulator
, workerNode
) =>
929 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
931 ) < this.numberOfWorkers
933 this.createAndSetupWorkerNode()
935 this.starting
= false
940 public async destroy (): Promise
<void> {
942 this.workerNodes
.map(async (_
, workerNodeKey
) => {
943 await this.destroyWorkerNode(workerNodeKey
)
946 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
947 if (this.emitter
instanceof EventEmitterAsyncResource
) {
948 this.emitter
?.emitDestroy()
953 protected async sendKillMessageToWorker (
954 workerNodeKey
: number
956 await new Promise
<void>((resolve
, reject
) => {
957 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
958 this.checkMessageWorkerId(message
)
959 if (message
.kill
=== 'success') {
961 } else if (message
.kill
=== 'failure') {
964 `Kill message handling failed on worker ${
965 message.workerId as number
971 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
972 this.sendToWorker(workerNodeKey
, { kill
: true })
977 * Terminates the worker node given its worker node key.
979 * @param workerNodeKey - The worker node key.
981 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
984 * Setup hook to execute code before worker nodes are created in the abstract constructor.
989 protected setupHook (): void {
990 /* Intentionally empty */
994 * Should return whether the worker is the main worker or not.
996 protected abstract isMain (): boolean
999 * Hook executed before the worker task execution.
1000 * Can be overridden.
1002 * @param workerNodeKey - The worker node key.
1003 * @param task - The task to execute.
1005 protected beforeTaskExecutionHook (
1006 workerNodeKey
: number,
1009 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1010 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1011 ++workerUsage
.tasks
.executing
1012 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1015 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1016 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1020 const taskFunctionWorkerUsage
= this.workerNodes
[
1022 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1023 ++taskFunctionWorkerUsage
.tasks
.executing
1024 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1029 * Hook executed after the worker task execution.
1030 * Can be overridden.
1032 * @param workerNodeKey - The worker node key.
1033 * @param message - The received message.
1035 protected afterTaskExecutionHook (
1036 workerNodeKey
: number,
1037 message
: MessageValue
<Response
>
1039 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1040 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1041 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1042 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1043 this.updateEluWorkerUsage(workerUsage
, message
)
1046 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1047 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1048 message
.taskPerformance
?.name
as string
1051 const taskFunctionWorkerUsage
= this.workerNodes
[
1053 ].getTaskFunctionWorkerUsage(
1054 message
.taskPerformance
?.name
as string
1056 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1057 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1058 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1063 * Whether the worker node shall update its task function worker usage or not.
1065 * @param workerNodeKey - The worker node key.
1066 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1068 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1069 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1071 workerInfo
!= null &&
1072 Array.isArray(workerInfo
.taskFunctionNames
) &&
1073 workerInfo
.taskFunctionNames
.length
> 2
1077 private updateTaskStatisticsWorkerUsage (
1078 workerUsage
: WorkerUsage
,
1079 message
: MessageValue
<Response
>
1081 const workerTaskStatistics
= workerUsage
.tasks
1083 workerTaskStatistics
.executing
!= null &&
1084 workerTaskStatistics
.executing
> 0
1086 --workerTaskStatistics
.executing
1088 if (message
.workerError
== null) {
1089 ++workerTaskStatistics
.executed
1091 ++workerTaskStatistics
.failed
1095 private updateRunTimeWorkerUsage (
1096 workerUsage
: WorkerUsage
,
1097 message
: MessageValue
<Response
>
1099 if (message
.workerError
!= null) {
1102 updateMeasurementStatistics(
1103 workerUsage
.runTime
,
1104 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1105 message
.taskPerformance
?.runTime
?? 0
1109 private updateWaitTimeWorkerUsage (
1110 workerUsage
: WorkerUsage
,
1113 const timestamp
= performance
.now()
1114 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1115 updateMeasurementStatistics(
1116 workerUsage
.waitTime
,
1117 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1122 private updateEluWorkerUsage (
1123 workerUsage
: WorkerUsage
,
1124 message
: MessageValue
<Response
>
1126 if (message
.workerError
!= null) {
1129 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1130 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1131 updateMeasurementStatistics(
1132 workerUsage
.elu
.active
,
1133 eluTaskStatisticsRequirements
,
1134 message
.taskPerformance
?.elu
?.active
?? 0
1136 updateMeasurementStatistics(
1137 workerUsage
.elu
.idle
,
1138 eluTaskStatisticsRequirements
,
1139 message
.taskPerformance
?.elu
?.idle
?? 0
1141 if (eluTaskStatisticsRequirements
.aggregate
) {
1142 if (message
.taskPerformance
?.elu
!= null) {
1143 if (workerUsage
.elu
.utilization
!= null) {
1144 workerUsage
.elu
.utilization
=
1145 (workerUsage
.elu
.utilization
+
1146 message
.taskPerformance
.elu
.utilization
) /
1149 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1156 * Chooses a worker node for the next task.
1158 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1160 * @returns The chosen worker node key
1162 private chooseWorkerNode (): number {
1163 if (this.shallCreateDynamicWorker()) {
1164 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1166 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1168 return workerNodeKey
1171 return this.workerChoiceStrategyContext
.execute()
1175 * Conditions for dynamic worker creation.
1177 * @returns Whether to create a dynamic worker or not.
1179 private shallCreateDynamicWorker (): boolean {
1180 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1184 * Sends a message to worker given its worker node key.
1186 * @param workerNodeKey - The worker node key.
1187 * @param message - The message.
1188 * @param transferList - The optional array of transferable objects.
1190 protected abstract sendToWorker (
1191 workerNodeKey
: number,
1192 message
: MessageValue
<Data
>,
1193 transferList
?: TransferListItem
[]
1197 * Creates a new worker.
1199 * @returns Newly created worker.
1201 protected abstract createWorker (): Worker
1204 * Creates a new, completely set up worker node.
1206 * @returns New, completely set up worker node key.
1208 protected createAndSetupWorkerNode (): number {
1209 const worker
= this.createWorker()
1211 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1212 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1213 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1214 worker
.on('error', error
=> {
1215 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1216 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1217 workerInfo
.ready
= false
1218 this.workerNodes
[workerNodeKey
].closeChannel()
1219 this.emitter
?.emit(PoolEvents
.error
, error
)
1223 this.opts
.restartWorkerOnError
=== true
1225 if (workerInfo
.dynamic
) {
1226 this.createAndSetupDynamicWorkerNode()
1228 this.createAndSetupWorkerNode()
1231 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1232 this.redistributeQueuedTasks(workerNodeKey
)
1235 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1236 worker
.once('exit', () => {
1237 this.removeWorkerNode(worker
)
1240 const workerNodeKey
= this.addWorkerNode(worker
)
1242 this.afterWorkerNodeSetup(workerNodeKey
)
1244 return workerNodeKey
1248 * Creates a new, completely set up dynamic worker node.
1250 * @returns New, completely set up dynamic worker node key.
1252 protected createAndSetupDynamicWorkerNode (): number {
1253 const workerNodeKey
= this.createAndSetupWorkerNode()
1254 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1255 this.checkMessageWorkerId(message
)
1256 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1259 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1260 // Kill message received from worker
1262 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1263 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1264 ((this.opts
.enableTasksQueue
=== false &&
1265 workerUsage
.tasks
.executing
=== 0) ||
1266 (this.opts
.enableTasksQueue
=== true &&
1267 workerUsage
.tasks
.executing
=== 0 &&
1268 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1270 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1271 this.emitter
?.emit(PoolEvents
.error
, error
)
1275 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1276 this.sendToWorker(workerNodeKey
, {
1279 if (this.taskFunctions
.size
> 0) {
1280 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1281 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1282 taskFunctionOperation
: 'add',
1284 taskFunction
: taskFunction
.toString()
1286 this.emitter
?.emit(PoolEvents
.error
, error
)
1290 workerInfo
.dynamic
= true
1292 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1293 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1295 workerInfo
.ready
= true
1297 this.checkAndEmitDynamicWorkerCreationEvents()
1298 return workerNodeKey
1302 * Registers a listener callback on the worker given its worker node key.
1304 * @param workerNodeKey - The worker node key.
1305 * @param listener - The message listener callback.
1307 protected abstract registerWorkerMessageListener
<
1308 Message
extends Data
| Response
1310 workerNodeKey
: number,
1311 listener
: (message
: MessageValue
<Message
>) => void
1315 * Registers once a listener callback on the worker given its worker node key.
1317 * @param workerNodeKey - The worker node key.
1318 * @param listener - The message listener callback.
1320 protected abstract registerOnceWorkerMessageListener
<
1321 Message
extends Data
| Response
1323 workerNodeKey
: number,
1324 listener
: (message
: MessageValue
<Message
>) => void
1328 * Deregisters a listener callback on the worker given its worker node key.
1330 * @param workerNodeKey - The worker node key.
1331 * @param listener - The message listener callback.
1333 protected abstract deregisterWorkerMessageListener
<
1334 Message
extends Data
| Response
1336 workerNodeKey
: number,
1337 listener
: (message
: MessageValue
<Message
>) => void
1341 * Method hooked up after a worker node has been newly created.
1342 * Can be overridden.
1344 * @param workerNodeKey - The newly created worker node key.
1346 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1347 // Listen to worker messages.
1348 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1349 // Send the startup message to worker.
1350 this.sendStartupMessageToWorker(workerNodeKey
)
1351 // Send the statistics message to worker.
1352 this.sendStatisticsMessageToWorker(workerNodeKey
)
1353 if (this.opts
.enableTasksQueue
=== true) {
1354 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1355 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1356 this.taskStealingOnEmptyQueue
.bind(this)
1358 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1359 this.workerNodes
[workerNodeKey
].onBackPressure
=
1360 this.tasksStealingOnBackPressure
.bind(this)
1366 * Sends the startup message to worker given its worker node key.
1368 * @param workerNodeKey - The worker node key.
1370 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1373 * Sends the statistics message to worker given its worker node key.
1375 * @param workerNodeKey - The worker node key.
1377 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1378 this.sendToWorker(workerNodeKey
, {
1381 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1383 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1389 private redistributeQueuedTasks (workerNodeKey
: number): void {
1390 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1391 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1392 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1393 return workerNode
.info
.ready
&&
1394 workerNode
.usage
.tasks
.queued
<
1395 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1401 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1402 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1403 this.executeTask(destinationWorkerNodeKey
, task
)
1405 this.enqueueTask(destinationWorkerNodeKey
, task
)
1410 private updateTaskStolenStatisticsWorkerUsage (
1411 workerNodeKey
: number,
1414 const workerNode
= this.workerNodes
[workerNodeKey
]
1415 if (workerNode
?.usage
!= null) {
1416 ++workerNode
.usage
.tasks
.stolen
1419 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1420 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1422 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1425 ++taskFunctionWorkerUsage
.tasks
.stolen
1429 private taskStealingOnEmptyQueue (workerId
: number): void {
1430 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1431 const workerNodes
= this.workerNodes
1434 (workerNodeA
, workerNodeB
) =>
1435 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1437 const sourceWorkerNode
= workerNodes
.find(
1439 workerNode
.info
.ready
&&
1440 workerNode
.info
.id
!== workerId
&&
1441 workerNode
.usage
.tasks
.queued
> 0
1443 if (sourceWorkerNode
!= null) {
1444 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1445 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1446 this.executeTask(destinationWorkerNodeKey
, task
)
1448 this.enqueueTask(destinationWorkerNodeKey
, task
)
1450 this.updateTaskStolenStatisticsWorkerUsage(
1451 destinationWorkerNodeKey
,
1457 private tasksStealingOnBackPressure (workerId
: number): void {
1458 const sizeOffset
= 1
1459 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1462 const sourceWorkerNode
=
1463 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1464 const workerNodes
= this.workerNodes
1467 (workerNodeA
, workerNodeB
) =>
1468 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1470 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1472 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1473 workerNode
.info
.ready
&&
1474 workerNode
.info
.id
!== workerId
&&
1475 workerNode
.usage
.tasks
.queued
<
1476 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1478 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1479 if (this.shallExecuteTask(workerNodeKey
)) {
1480 this.executeTask(workerNodeKey
, task
)
1482 this.enqueueTask(workerNodeKey
, task
)
1484 this.updateTaskStolenStatisticsWorkerUsage(
1493 * This method is the listener registered for each worker message.
1495 * @returns The listener function to execute when a message is received from a worker.
1497 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1499 this.checkMessageWorkerId(message
)
1500 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1501 // Worker ready response received from worker
1502 this.handleWorkerReadyResponse(message
)
1503 } else if (message
.taskId
!= null) {
1504 // Task execution response received from worker
1505 this.handleTaskExecutionResponse(message
)
1506 } else if (message
.taskFunctionNames
!= null) {
1507 // Task function names message received from worker
1509 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1510 ).taskFunctionNames
= message
.taskFunctionNames
1515 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1516 if (message
.ready
=== false) {
1518 `Worker ${message.workerId as number} failed to initialize`
1521 const workerInfo
= this.getWorkerInfo(
1522 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1524 workerInfo
.ready
= message
.ready
as boolean
1525 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1527 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1531 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1532 const { taskId
, workerError
, data
} = message
1533 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1534 if (promiseResponse
!= null) {
1535 if (workerError
!= null) {
1536 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1537 promiseResponse
.reject(workerError
.message
)
1539 promiseResponse
.resolve(data
as Response
)
1541 const workerNodeKey
= promiseResponse
.workerNodeKey
1542 this.afterTaskExecutionHook(workerNodeKey
, message
)
1543 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1544 this.promiseResponseMap
.delete(taskId
as string)
1546 this.opts
.enableTasksQueue
=== true &&
1547 this.tasksQueueSize(workerNodeKey
) > 0 &&
1548 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1549 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1553 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1559 private checkAndEmitTaskExecutionEvents (): void {
1561 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1565 private checkAndEmitTaskQueuingEvents (): void {
1566 if (this.hasBackPressure()) {
1567 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1571 private checkAndEmitDynamicWorkerCreationEvents (): void {
1572 if (this.type === PoolTypes
.dynamic
) {
1574 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1580 * Gets the worker information given its worker node key.
1582 * @param workerNodeKey - The worker node key.
1583 * @returns The worker information.
1585 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1586 return this.workerNodes
[workerNodeKey
].info
1590 * Adds the given worker in the pool worker nodes.
1592 * @param worker - The worker.
1593 * @returns The added worker node key.
1594 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1596 private addWorkerNode (worker
: Worker
): number {
1597 const workerNode
= new WorkerNode
<Worker
, Data
>(
1599 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1601 // Flag the worker node as ready at pool startup.
1602 if (this.starting
) {
1603 workerNode
.info
.ready
= true
1605 this.workerNodes
.push(workerNode
)
1606 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1607 if (workerNodeKey
=== -1) {
1608 throw new Error('Worker added not found in worker nodes')
1610 return workerNodeKey
1614 * Removes the given worker from the pool worker nodes.
1616 * @param worker - The worker.
1618 private removeWorkerNode (worker
: Worker
): void {
1619 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1620 if (workerNodeKey
!== -1) {
1621 this.workerNodes
.splice(workerNodeKey
, 1)
1622 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1627 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1629 this.opts
.enableTasksQueue
=== true &&
1630 this.workerNodes
[workerNodeKey
].hasBackPressure()
1634 private hasBackPressure (): boolean {
1636 this.opts
.enableTasksQueue
=== true &&
1637 this.workerNodes
.findIndex(
1638 workerNode
=> !workerNode
.hasBackPressure()
1644 * Executes the given task on the worker given its worker node key.
1646 * @param workerNodeKey - The worker node key.
1647 * @param task - The task to execute.
1649 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1650 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1651 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1652 this.checkAndEmitTaskExecutionEvents()
1655 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1656 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1657 this.checkAndEmitTaskQueuingEvents()
1658 return tasksQueueSize
1661 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1662 return this.workerNodes
[workerNodeKey
].dequeueTask()
1665 private tasksQueueSize (workerNodeKey
: number): number {
1666 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1669 protected flushTasksQueue (workerNodeKey
: number): void {
1670 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1673 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1676 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1679 private flushTasksQueues (): void {
1680 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1681 this.flushTasksQueue(workerNodeKey
)