1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
22 import { KillBehaviors
} from
'../worker/worker-options'
23 import type { TaskFunction
} from
'../worker/task-functions'
31 type TasksQueueOptions
37 WorkerNodeEventDetail
,
42 type MeasurementStatisticsRequirements
,
44 WorkerChoiceStrategies
,
45 type WorkerChoiceStrategy
,
46 type WorkerChoiceStrategyOptions
47 } from
'./selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
49 import { version
} from
'./version'
50 import { WorkerNode
} from
'./worker-node'
53 checkValidTasksQueueOptions
,
54 checkValidWorkerChoiceStrategy
,
55 updateMeasurementStatistics
59 * Base class that implements some shared logic for all poolifier pools.
61 * @typeParam Worker - Type of worker which manages this pool.
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
65 export abstract class AbstractPool
<
66 Worker
extends IWorker
,
69 > implements IPool
<Worker
, Data
, Response
> {
71 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
74 public emitter
?: EventEmitterAsyncResource
77 * Dynamic pool maximum size property placeholder.
79 protected readonly max
?: number
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
89 new Map
<string, PromiseResponseWrapper
<Response
>>()
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
105 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
108 * Whether the pool is started or not.
110 private started
: boolean
112 * Whether the pool is starting or not.
114 private starting
: boolean
116 * The start timestamp of the pool.
118 private readonly startTimestamp
121 * Constructs a new poolifier pool.
123 * @param numberOfWorkers - Number of workers that this pool should manage.
124 * @param filePath - Path to the worker file.
125 * @param opts - Options for the pool.
128 protected readonly numberOfWorkers
: number,
129 protected readonly filePath
: string,
130 protected readonly opts
: PoolOptions
<Worker
>
132 if (!this.isMain()) {
134 'Cannot start a pool from a worker with the same type as the pool'
137 checkFilePath(this.filePath
)
138 this.checkNumberOfWorkers(this.numberOfWorkers
)
139 this.checkPoolOptions(this.opts
)
141 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
142 this.executeTask
= this.executeTask
.bind(this)
143 this.enqueueTask
= this.enqueueTask
.bind(this)
145 if (this.opts
.enableEvents
=== true) {
146 this.initializeEventEmitter()
148 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
154 this.opts
.workerChoiceStrategy
,
155 this.opts
.workerChoiceStrategyOptions
160 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
163 this.starting
= false
164 if (this.opts
.startWorkers
=== true) {
168 this.startTimestamp
= performance
.now()
171 private checkNumberOfWorkers (numberOfWorkers
: number): void {
172 if (numberOfWorkers
== null) {
174 'Cannot instantiate a pool without specifying the number of workers'
176 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
178 'Cannot instantiate a pool with a non safe integer number of workers'
180 } else if (numberOfWorkers
< 0) {
181 throw new RangeError(
182 'Cannot instantiate a pool with a negative number of workers'
184 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
185 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
189 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
190 if (isPlainObject(opts
)) {
191 this.opts
.startWorkers
= opts
.startWorkers
?? true
192 checkValidWorkerChoiceStrategy(
193 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
195 this.opts
.workerChoiceStrategy
=
196 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategyOptions(
198 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
200 this.opts
.workerChoiceStrategyOptions
= {
201 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
202 ...opts
.workerChoiceStrategyOptions
204 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
205 this.opts
.enableEvents
= opts
.enableEvents
?? true
206 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
207 if (this.opts
.enableTasksQueue
) {
208 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
209 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
210 opts
.tasksQueueOptions
as TasksQueueOptions
214 throw new TypeError('Invalid pool options: must be a plain object')
218 private checkValidWorkerChoiceStrategyOptions (
219 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
222 workerChoiceStrategyOptions
!= null &&
223 !isPlainObject(workerChoiceStrategyOptions
)
226 'Invalid worker choice strategy options: must be a plain object'
230 workerChoiceStrategyOptions
?.retries
!= null &&
231 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
234 'Invalid worker choice strategy options: retries must be an integer'
238 workerChoiceStrategyOptions
?.retries
!= null &&
239 workerChoiceStrategyOptions
.retries
< 0
241 throw new RangeError(
242 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
246 workerChoiceStrategyOptions
?.weights
!= null &&
247 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
250 'Invalid worker choice strategy options: must have a weight for each worker node'
254 workerChoiceStrategyOptions
?.measurement
!= null &&
255 !Object.values(Measurements
).includes(
256 workerChoiceStrategyOptions
.measurement
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
265 private initializeEventEmitter (): void {
266 this.emitter
= new EventEmitterAsyncResource({
267 name
: `poolifier:${this.type}-${this.worker}-pool`
272 public get
info (): PoolInfo
{
277 started
: this.started
,
279 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
280 minSize
: this.minSize
,
281 maxSize
: this.maxSize
,
282 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
283 .runTime
.aggregate
&&
284 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
285 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
286 workerNodes
: this.workerNodes
.length
,
287 idleWorkerNodes
: this.workerNodes
.reduce(
288 (accumulator
, workerNode
) =>
289 workerNode
.usage
.tasks
.executing
=== 0
294 busyWorkerNodes
: this.workerNodes
.reduce(
295 (accumulator
, workerNode
) =>
296 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
299 executedTasks
: this.workerNodes
.reduce(
300 (accumulator
, workerNode
) =>
301 accumulator
+ workerNode
.usage
.tasks
.executed
,
304 executingTasks
: this.workerNodes
.reduce(
305 (accumulator
, workerNode
) =>
306 accumulator
+ workerNode
.usage
.tasks
.executing
,
309 ...(this.opts
.enableTasksQueue
=== true && {
310 queuedTasks
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 accumulator
+ workerNode
.usage
.tasks
.queued
,
316 ...(this.opts
.enableTasksQueue
=== true && {
317 maxQueuedTasks
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
323 ...(this.opts
.enableTasksQueue
=== true && {
324 backPressure
: this.hasBackPressure()
326 ...(this.opts
.enableTasksQueue
=== true && {
327 stolenTasks
: this.workerNodes
.reduce(
328 (accumulator
, workerNode
) =>
329 accumulator
+ workerNode
.usage
.tasks
.stolen
,
333 failedTasks
: this.workerNodes
.reduce(
334 (accumulator
, workerNode
) =>
335 accumulator
+ workerNode
.usage
.tasks
.failed
,
338 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
339 .runTime
.aggregate
&& {
343 ...this.workerNodes
.map(
344 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
350 ...this.workerNodes
.map(
351 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
355 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
356 .runTime
.average
&& {
359 this.workerNodes
.reduce
<number[]>(
360 (accumulator
, workerNode
) =>
361 accumulator
.concat(workerNode
.usage
.runTime
.history
),
367 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 this.workerNodes
.reduce
<number[]>(
372 (accumulator
, workerNode
) =>
373 accumulator
.concat(workerNode
.usage
.runTime
.history
),
381 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
382 .waitTime
.aggregate
&& {
386 ...this.workerNodes
.map(
387 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
393 ...this.workerNodes
.map(
394 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
398 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
399 .waitTime
.average
&& {
402 this.workerNodes
.reduce
<number[]>(
403 (accumulator
, workerNode
) =>
404 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
410 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
411 .waitTime
.median
&& {
414 this.workerNodes
.reduce
<number[]>(
415 (accumulator
, workerNode
) =>
416 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
428 * The pool readiness boolean status.
430 private get
ready (): boolean {
432 this.workerNodes
.reduce(
433 (accumulator
, workerNode
) =>
434 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
443 * The approximate pool utilization.
445 * @returns The pool utilization.
447 private get
utilization (): number {
448 const poolTimeCapacity
=
449 (performance
.now() - this.startTimestamp
) * this.maxSize
450 const totalTasksRunTime
= this.workerNodes
.reduce(
451 (accumulator
, workerNode
) =>
452 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
455 const totalTasksWaitTime
= this.workerNodes
.reduce(
456 (accumulator
, workerNode
) =>
457 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
460 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
466 * If it is `'dynamic'`, it provides the `max` property.
468 protected abstract get
type (): PoolType
473 protected abstract get
worker (): WorkerType
476 * The pool minimum size.
478 protected get
minSize (): number {
479 return this.numberOfWorkers
483 * The pool maximum size.
485 protected get
maxSize (): number {
486 return this.max
?? this.numberOfWorkers
490 * Checks if the worker id sent in the received message from a worker is valid.
492 * @param message - The received message.
493 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
495 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
496 if (message
.workerId
== null) {
497 throw new Error('Worker message received without worker id')
498 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
500 `Worker message received from unknown worker '${message.workerId}'`
506 * Gets the given worker its worker node key.
508 * @param worker - The worker.
509 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
511 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
512 return this.workerNodes
.findIndex(
513 workerNode
=> workerNode
.worker
=== worker
518 * Gets the worker node key given its worker id.
520 * @param workerId - The worker id.
521 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
523 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
524 return this.workerNodes
.findIndex(
525 workerNode
=> workerNode
.info
.id
=== workerId
530 public setWorkerChoiceStrategy (
531 workerChoiceStrategy
: WorkerChoiceStrategy
,
532 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
534 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
535 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
536 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
537 this.opts
.workerChoiceStrategy
539 if (workerChoiceStrategyOptions
!= null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
542 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
543 workerNode
.resetUsage()
544 this.sendStatisticsMessageToWorker(workerNodeKey
)
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
553 this.opts
.workerChoiceStrategyOptions
= {
554 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
555 ...workerChoiceStrategyOptions
557 this.workerChoiceStrategyContext
.setOptions(
558 this.opts
.workerChoiceStrategyOptions
563 public enableTasksQueue (
565 tasksQueueOptions
?: TasksQueueOptions
567 if (this.opts
.enableTasksQueue
=== true && !enable
) {
568 this.unsetTaskStealing()
569 this.unsetTasksStealingOnBackPressure()
570 this.flushTasksQueues()
572 this.opts
.enableTasksQueue
= enable
573 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
577 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
578 if (this.opts
.enableTasksQueue
=== true) {
579 checkValidTasksQueueOptions(tasksQueueOptions
)
580 this.opts
.tasksQueueOptions
=
581 this.buildTasksQueueOptions(tasksQueueOptions
)
582 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
583 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
584 this.setTaskStealing()
586 this.unsetTaskStealing()
588 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
589 this.setTasksStealingOnBackPressure()
591 this.unsetTasksStealingOnBackPressure()
593 } else if (this.opts
.tasksQueueOptions
!= null) {
594 delete this.opts
.tasksQueueOptions
598 private buildTasksQueueOptions (
599 tasksQueueOptions
: TasksQueueOptions
600 ): TasksQueueOptions
{
603 size
: Math.pow(this.maxSize
, 2),
606 tasksStealingOnBackPressure
: true
612 private setTasksQueueSize (size
: number): void {
613 for (const workerNode
of this.workerNodes
) {
614 workerNode
.tasksQueueBackPressureSize
= size
618 private setTaskStealing (): void {
619 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
620 this.workerNodes
[workerNodeKey
].addEventListener(
622 this.handleEmptyQueueEvent
as EventListener
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 this.workerNodes
[workerNodeKey
].removeEventListener(
631 this.handleEmptyQueueEvent
as EventListener
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
638 this.workerNodes
[workerNodeKey
].addEventListener(
640 this.handleBackPressureEvent
as EventListener
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
647 this.workerNodes
[workerNodeKey
].removeEventListener(
649 this.handleBackPressureEvent
as EventListener
655 * Whether the pool is full or not.
657 * The pool filling boolean status.
659 protected get
full (): boolean {
660 return this.workerNodes
.length
>= this.maxSize
664 * Whether the pool is busy or not.
666 * The pool busyness boolean status.
668 protected abstract get
busy (): boolean
671 * Whether worker nodes are executing concurrently their tasks quota or not.
673 * @returns Worker nodes busyness boolean status.
675 protected internalBusy (): boolean {
676 if (this.opts
.enableTasksQueue
=== true) {
678 this.workerNodes
.findIndex(
680 workerNode
.info
.ready
&&
681 workerNode
.usage
.tasks
.executing
<
682 (this.opts
.tasksQueueOptions
?.concurrency
as number)
687 this.workerNodes
.findIndex(
689 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
694 private async sendTaskFunctionOperationToWorker (
695 workerNodeKey
: number,
696 message
: MessageValue
<Data
>
697 ): Promise
<boolean> {
698 return await new Promise
<boolean>((resolve
, reject
) => {
699 const taskFunctionOperationListener
= (
700 message
: MessageValue
<Response
>
702 this.checkMessageWorkerId(message
)
703 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
705 message
.taskFunctionOperationStatus
!= null &&
706 message
.workerId
=== workerId
708 if (message
.taskFunctionOperationStatus
) {
710 } else if (!message
.taskFunctionOperationStatus
) {
713 `Task function operation '${
714 message.taskFunctionOperation as string
715 }' failed on worker ${message.workerId} with error: '${
716 message.workerError?.message as string
721 this.deregisterWorkerMessageListener(
722 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
723 taskFunctionOperationListener
727 this.registerWorkerMessageListener(
729 taskFunctionOperationListener
731 this.sendToWorker(workerNodeKey
, message
)
735 private async sendTaskFunctionOperationToWorkers (
736 message
: MessageValue
<Data
>
737 ): Promise
<boolean> {
738 return await new Promise
<boolean>((resolve
, reject
) => {
739 const responsesReceived
= new Array<MessageValue
<Response
>>()
740 const taskFunctionOperationsListener
= (
741 message
: MessageValue
<Response
>
743 this.checkMessageWorkerId(message
)
744 if (message
.taskFunctionOperationStatus
!= null) {
745 responsesReceived
.push(message
)
746 if (responsesReceived
.length
=== this.workerNodes
.length
) {
748 responsesReceived
.every(
749 message
=> message
.taskFunctionOperationStatus
=== true
754 responsesReceived
.some(
755 message
=> message
.taskFunctionOperationStatus
=== false
758 const errorResponse
= responsesReceived
.find(
759 response
=> response
.taskFunctionOperationStatus
=== false
763 `Task function operation '${
764 message.taskFunctionOperation as string
765 }' failed on worker ${
766 errorResponse?.workerId as number
768 errorResponse?.workerError?.message as string
773 this.deregisterWorkerMessageListener(
774 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
775 taskFunctionOperationsListener
780 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
781 this.registerWorkerMessageListener(
783 taskFunctionOperationsListener
785 this.sendToWorker(workerNodeKey
, message
)
791 public hasTaskFunction (name
: string): boolean {
792 for (const workerNode
of this.workerNodes
) {
794 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
795 workerNode
.info
.taskFunctionNames
.includes(name
)
804 public async addTaskFunction (
806 fn
: TaskFunction
<Data
, Response
>
807 ): Promise
<boolean> {
808 if (typeof name
!== 'string') {
809 throw new TypeError('name argument must be a string')
811 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
812 throw new TypeError('name argument must not be an empty string')
814 if (typeof fn
!== 'function') {
815 throw new TypeError('fn argument must be a function')
817 const opResult
= await this.sendTaskFunctionOperationToWorkers({
818 taskFunctionOperation
: 'add',
819 taskFunctionName
: name
,
820 taskFunction
: fn
.toString()
822 this.taskFunctions
.set(name
, fn
)
827 public async removeTaskFunction (name
: string): Promise
<boolean> {
828 if (!this.taskFunctions
.has(name
)) {
830 'Cannot remove a task function not handled on the pool side'
833 const opResult
= await this.sendTaskFunctionOperationToWorkers({
834 taskFunctionOperation
: 'remove',
835 taskFunctionName
: name
837 this.deleteTaskFunctionWorkerUsages(name
)
838 this.taskFunctions
.delete(name
)
843 public listTaskFunctionNames (): string[] {
844 for (const workerNode
of this.workerNodes
) {
846 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
847 workerNode
.info
.taskFunctionNames
.length
> 0
849 return workerNode
.info
.taskFunctionNames
856 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
857 return await this.sendTaskFunctionOperationToWorkers({
858 taskFunctionOperation
: 'default',
859 taskFunctionName
: name
863 private deleteTaskFunctionWorkerUsages (name
: string): void {
864 for (const workerNode
of this.workerNodes
) {
865 workerNode
.deleteTaskFunctionWorkerUsage(name
)
869 private shallExecuteTask (workerNodeKey
: number): boolean {
871 this.tasksQueueSize(workerNodeKey
) === 0 &&
872 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
873 (this.opts
.tasksQueueOptions
?.concurrency
as number)
878 public async execute (
881 transferList
?: TransferListItem
[]
882 ): Promise
<Response
> {
883 return await new Promise
<Response
>((resolve
, reject
) => {
885 reject(new Error('Cannot execute a task on not started pool'))
888 if (name
!= null && typeof name
!== 'string') {
889 reject(new TypeError('name argument must be a string'))
894 typeof name
=== 'string' &&
895 name
.trim().length
=== 0
897 reject(new TypeError('name argument must not be an empty string'))
900 if (transferList
!= null && !Array.isArray(transferList
)) {
901 reject(new TypeError('transferList argument must be an array'))
904 const timestamp
= performance
.now()
905 const workerNodeKey
= this.chooseWorkerNode()
906 const task
: Task
<Data
> = {
907 name
: name
?? DEFAULT_TASK_NAME
,
908 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
909 data
: data
?? ({} as Data
),
914 this.promiseResponseMap
.set(task
.taskId
as string, {
920 this.opts
.enableTasksQueue
=== false ||
921 (this.opts
.enableTasksQueue
=== true &&
922 this.shallExecuteTask(workerNodeKey
))
924 this.executeTask(workerNodeKey
, task
)
926 this.enqueueTask(workerNodeKey
, task
)
932 public start (): void {
935 this.workerNodes
.reduce(
936 (accumulator
, workerNode
) =>
937 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
939 ) < this.numberOfWorkers
941 this.createAndSetupWorkerNode()
943 this.starting
= false
948 public async destroy (): Promise
<void> {
950 this.workerNodes
.map(async (_
, workerNodeKey
) => {
951 await this.destroyWorkerNode(workerNodeKey
)
954 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
955 this.emitter
?.emitDestroy()
959 protected async sendKillMessageToWorker (
960 workerNodeKey
: number
962 await new Promise
<void>((resolve
, reject
) => {
963 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
964 this.checkMessageWorkerId(message
)
965 if (message
.kill
=== 'success') {
967 } else if (message
.kill
=== 'failure') {
970 `Kill message handling failed on worker ${
971 message.workerId as number
977 // FIXME: should be registered only once
978 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
979 this.sendToWorker(workerNodeKey
, { kill
: true })
984 * Terminates the worker node given its worker node key.
986 * @param workerNodeKey - The worker node key.
988 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
991 * Setup hook to execute code before worker nodes are created in the abstract constructor.
996 protected setupHook (): void {
997 /* Intentionally empty */
1001 * Should return whether the worker is the main worker or not.
1003 protected abstract isMain (): boolean
1006 * Hook executed before the worker task execution.
1007 * Can be overridden.
1009 * @param workerNodeKey - The worker node key.
1010 * @param task - The task to execute.
1012 protected beforeTaskExecutionHook (
1013 workerNodeKey
: number,
1016 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1017 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1018 ++workerUsage
.tasks
.executing
1019 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1022 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1023 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1027 const taskFunctionWorkerUsage
= this.workerNodes
[
1029 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1030 ++taskFunctionWorkerUsage
.tasks
.executing
1031 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1036 * Hook executed after the worker task execution.
1037 * Can be overridden.
1039 * @param workerNodeKey - The worker node key.
1040 * @param message - The received message.
1042 protected afterTaskExecutionHook (
1043 workerNodeKey
: number,
1044 message
: MessageValue
<Response
>
1046 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1047 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1048 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1049 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1050 this.updateEluWorkerUsage(workerUsage
, message
)
1053 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1054 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1055 message
.taskPerformance
?.name
as string
1058 const taskFunctionWorkerUsage
= this.workerNodes
[
1060 ].getTaskFunctionWorkerUsage(
1061 message
.taskPerformance
?.name
as string
1063 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1064 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1065 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1070 * Whether the worker node shall update its task function worker usage or not.
1072 * @param workerNodeKey - The worker node key.
1073 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1075 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1076 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1078 workerInfo
!= null &&
1079 Array.isArray(workerInfo
.taskFunctionNames
) &&
1080 workerInfo
.taskFunctionNames
.length
> 2
1084 private updateTaskStatisticsWorkerUsage (
1085 workerUsage
: WorkerUsage
,
1086 message
: MessageValue
<Response
>
1088 const workerTaskStatistics
= workerUsage
.tasks
1090 workerTaskStatistics
.executing
!= null &&
1091 workerTaskStatistics
.executing
> 0
1093 --workerTaskStatistics
.executing
1095 if (message
.workerError
== null) {
1096 ++workerTaskStatistics
.executed
1098 ++workerTaskStatistics
.failed
1102 private updateRunTimeWorkerUsage (
1103 workerUsage
: WorkerUsage
,
1104 message
: MessageValue
<Response
>
1106 if (message
.workerError
!= null) {
1109 updateMeasurementStatistics(
1110 workerUsage
.runTime
,
1111 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1112 message
.taskPerformance
?.runTime
?? 0
1116 private updateWaitTimeWorkerUsage (
1117 workerUsage
: WorkerUsage
,
1120 const timestamp
= performance
.now()
1121 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1122 updateMeasurementStatistics(
1123 workerUsage
.waitTime
,
1124 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1129 private updateEluWorkerUsage (
1130 workerUsage
: WorkerUsage
,
1131 message
: MessageValue
<Response
>
1133 if (message
.workerError
!= null) {
1136 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1137 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1138 updateMeasurementStatistics(
1139 workerUsage
.elu
.active
,
1140 eluTaskStatisticsRequirements
,
1141 message
.taskPerformance
?.elu
?.active
?? 0
1143 updateMeasurementStatistics(
1144 workerUsage
.elu
.idle
,
1145 eluTaskStatisticsRequirements
,
1146 message
.taskPerformance
?.elu
?.idle
?? 0
1148 if (eluTaskStatisticsRequirements
.aggregate
) {
1149 if (message
.taskPerformance
?.elu
!= null) {
1150 if (workerUsage
.elu
.utilization
!= null) {
1151 workerUsage
.elu
.utilization
=
1152 (workerUsage
.elu
.utilization
+
1153 message
.taskPerformance
.elu
.utilization
) /
1156 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1163 * Chooses a worker node for the next task.
1165 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1167 * @returns The chosen worker node key
1169 private chooseWorkerNode (): number {
1170 if (this.shallCreateDynamicWorker()) {
1171 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1173 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1175 return workerNodeKey
1178 return this.workerChoiceStrategyContext
.execute()
1182 * Conditions for dynamic worker creation.
1184 * @returns Whether to create a dynamic worker or not.
1186 private shallCreateDynamicWorker (): boolean {
1187 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1191 * Sends a message to worker given its worker node key.
1193 * @param workerNodeKey - The worker node key.
1194 * @param message - The message.
1195 * @param transferList - The optional array of transferable objects.
1197 protected abstract sendToWorker (
1198 workerNodeKey
: number,
1199 message
: MessageValue
<Data
>,
1200 transferList
?: TransferListItem
[]
1204 * Creates a new worker.
1206 * @returns Newly created worker.
1208 protected abstract createWorker (): Worker
1211 * Creates a new, completely set up worker node.
1213 * @returns New, completely set up worker node key.
1215 protected createAndSetupWorkerNode (): number {
1216 const worker
= this.createWorker()
1218 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1219 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1220 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1221 worker
.on('error', error
=> {
1222 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1223 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1224 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1225 this.emitter
?.emit(PoolEvents
.error
, error
)
1226 this.workerNodes
[workerNodeKey
].closeChannel()
1230 this.opts
.restartWorkerOnError
=== true
1232 if (workerInfo
.dynamic
) {
1233 this.createAndSetupDynamicWorkerNode()
1235 this.createAndSetupWorkerNode()
1238 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1239 this.redistributeQueuedTasks(workerNodeKey
)
1242 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1243 worker
.once('exit', () => {
1244 this.removeWorkerNode(worker
)
1247 const workerNodeKey
= this.addWorkerNode(worker
)
1249 this.afterWorkerNodeSetup(workerNodeKey
)
1251 return workerNodeKey
1255 * Creates a new, completely set up dynamic worker node.
1257 * @returns New, completely set up dynamic worker node key.
1259 protected createAndSetupDynamicWorkerNode (): number {
1260 const workerNodeKey
= this.createAndSetupWorkerNode()
1261 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1262 this.checkMessageWorkerId(message
)
1263 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1266 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1267 // Kill message received from worker
1269 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1270 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1271 ((this.opts
.enableTasksQueue
=== false &&
1272 workerUsage
.tasks
.executing
=== 0) ||
1273 (this.opts
.enableTasksQueue
=== true &&
1274 workerUsage
.tasks
.executing
=== 0 &&
1275 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1277 // Flag the worker node as not ready immediately
1278 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1279 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1280 this.emitter
?.emit(PoolEvents
.error
, error
)
1284 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1285 this.sendToWorker(workerNodeKey
, {
1288 if (this.taskFunctions
.size
> 0) {
1289 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1290 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1291 taskFunctionOperation
: 'add',
1293 taskFunction
: taskFunction
.toString()
1295 this.emitter
?.emit(PoolEvents
.error
, error
)
1299 workerInfo
.dynamic
= true
1301 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1302 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1304 workerInfo
.ready
= true
1306 this.checkAndEmitDynamicWorkerCreationEvents()
1307 return workerNodeKey
1311 * Registers a listener callback on the worker given its worker node key.
1313 * @param workerNodeKey - The worker node key.
1314 * @param listener - The message listener callback.
1316 protected abstract registerWorkerMessageListener
<
1317 Message
extends Data
| Response
1319 workerNodeKey
: number,
1320 listener
: (message
: MessageValue
<Message
>) => void
1324 * Registers once a listener callback on the worker given its worker node key.
1326 * @param workerNodeKey - The worker node key.
1327 * @param listener - The message listener callback.
1329 protected abstract registerOnceWorkerMessageListener
<
1330 Message
extends Data
| Response
1332 workerNodeKey
: number,
1333 listener
: (message
: MessageValue
<Message
>) => void
1337 * Deregisters a listener callback on the worker given its worker node key.
1339 * @param workerNodeKey - The worker node key.
1340 * @param listener - The message listener callback.
1342 protected abstract deregisterWorkerMessageListener
<
1343 Message
extends Data
| Response
1345 workerNodeKey
: number,
1346 listener
: (message
: MessageValue
<Message
>) => void
1350 * Method hooked up after a worker node has been newly created.
1351 * Can be overridden.
1353 * @param workerNodeKey - The newly created worker node key.
1355 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1356 // Listen to worker messages.
1357 this.registerWorkerMessageListener(
1359 this.workerMessageListener
.bind(this)
1361 // Send the startup message to worker.
1362 this.sendStartupMessageToWorker(workerNodeKey
)
1363 // Send the statistics message to worker.
1364 this.sendStatisticsMessageToWorker(workerNodeKey
)
1365 if (this.opts
.enableTasksQueue
=== true) {
1366 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1367 this.workerNodes
[workerNodeKey
].addEventListener(
1369 this.handleEmptyQueueEvent
as EventListener
1372 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1373 this.workerNodes
[workerNodeKey
].addEventListener(
1375 this.handleBackPressureEvent
as EventListener
1382 * Sends the startup message to worker given its worker node key.
1384 * @param workerNodeKey - The worker node key.
1386 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1389 * Sends the statistics message to worker given its worker node key.
1391 * @param workerNodeKey - The worker node key.
1393 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1394 this.sendToWorker(workerNodeKey
, {
1397 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1399 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1405 private redistributeQueuedTasks (workerNodeKey
: number): void {
1406 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1407 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1408 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1409 return workerNode
.info
.ready
&&
1410 workerNode
.usage
.tasks
.queued
<
1411 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1417 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1418 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1419 this.executeTask(destinationWorkerNodeKey
, task
)
1421 this.enqueueTask(destinationWorkerNodeKey
, task
)
1426 private updateTaskStolenStatisticsWorkerUsage (
1427 workerNodeKey
: number,
1430 const workerNode
= this.workerNodes
[workerNodeKey
]
1431 if (workerNode
?.usage
!= null) {
1432 ++workerNode
.usage
.tasks
.stolen
1435 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1436 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1438 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1441 ++taskFunctionWorkerUsage
.tasks
.stolen
1445 private readonly handleEmptyQueueEvent
= (
1446 event
: CustomEvent
<WorkerNodeEventDetail
>
1448 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1449 event
.detail
.workerId
1451 const workerNodes
= this.workerNodes
1454 (workerNodeA
, workerNodeB
) =>
1455 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1457 const sourceWorkerNode
= workerNodes
.find(
1459 workerNode
.info
.ready
&&
1460 workerNode
.info
.id
!== event
.detail
.workerId
&&
1461 workerNode
.usage
.tasks
.queued
> 0
1463 if (sourceWorkerNode
!= null) {
1464 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1465 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1466 this.executeTask(destinationWorkerNodeKey
, task
)
1468 this.enqueueTask(destinationWorkerNodeKey
, task
)
1470 this.updateTaskStolenStatisticsWorkerUsage(
1471 destinationWorkerNodeKey
,
1477 private readonly handleBackPressureEvent
= (
1478 event
: CustomEvent
<WorkerNodeEventDetail
>
1480 const sizeOffset
= 1
1481 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1484 const sourceWorkerNode
=
1485 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(event
.detail
.workerId
)]
1486 const workerNodes
= this.workerNodes
1489 (workerNodeA
, workerNodeB
) =>
1490 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1492 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1494 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1495 workerNode
.info
.ready
&&
1496 workerNode
.info
.id
!== event
.detail
.workerId
&&
1497 workerNode
.usage
.tasks
.queued
<
1498 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1500 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1501 if (this.shallExecuteTask(workerNodeKey
)) {
1502 this.executeTask(workerNodeKey
, task
)
1504 this.enqueueTask(workerNodeKey
, task
)
1506 this.updateTaskStolenStatisticsWorkerUsage(
1515 * This method is the message listener registered on each worker.
1517 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1518 this.checkMessageWorkerId(message
)
1519 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1520 // Worker ready response received from worker
1521 this.handleWorkerReadyResponse(message
)
1522 } else if (message
.taskId
!= null) {
1523 // Task execution response received from worker
1524 this.handleTaskExecutionResponse(message
)
1525 } else if (message
.taskFunctionNames
!= null) {
1526 // Task function names message received from worker
1528 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1529 ).taskFunctionNames
= message
.taskFunctionNames
1533 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1534 if (message
.ready
=== false) {
1536 `Worker ${message.workerId as number} failed to initialize`
1539 const workerInfo
= this.getWorkerInfo(
1540 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1542 workerInfo
.ready
= message
.ready
as boolean
1543 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1545 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1549 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1550 const { taskId
, workerError
, data
} = message
1551 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1552 if (promiseResponse
!= null) {
1553 if (workerError
!= null) {
1554 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1555 promiseResponse
.reject(workerError
.message
)
1557 promiseResponse
.resolve(data
as Response
)
1559 const workerNodeKey
= promiseResponse
.workerNodeKey
1560 this.afterTaskExecutionHook(workerNodeKey
, message
)
1561 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1562 this.promiseResponseMap
.delete(taskId
as string)
1564 this.opts
.enableTasksQueue
=== true &&
1565 this.tasksQueueSize(workerNodeKey
) > 0 &&
1566 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1567 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1571 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1577 private checkAndEmitTaskExecutionEvents (): void {
1579 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1583 private checkAndEmitTaskQueuingEvents (): void {
1584 if (this.hasBackPressure()) {
1585 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1589 private checkAndEmitDynamicWorkerCreationEvents (): void {
1590 if (this.type === PoolTypes
.dynamic
) {
1592 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1598 * Gets the worker information given its worker node key.
1600 * @param workerNodeKey - The worker node key.
1601 * @returns The worker information.
1603 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1604 return this.workerNodes
[workerNodeKey
]?.info
1608 * Adds the given worker in the pool worker nodes.
1610 * @param worker - The worker.
1611 * @returns The added worker node key.
1612 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1614 private addWorkerNode (worker
: Worker
): number {
1615 const workerNode
= new WorkerNode
<Worker
, Data
>(
1617 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1619 // Flag the worker node as ready at pool startup.
1620 if (this.starting
) {
1621 workerNode
.info
.ready
= true
1623 this.workerNodes
.push(workerNode
)
1624 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1625 if (workerNodeKey
=== -1) {
1626 throw new Error('Worker added not found in worker nodes')
1628 return workerNodeKey
1632 * Removes the given worker from the pool worker nodes.
1634 * @param worker - The worker.
1636 private removeWorkerNode (worker
: Worker
): void {
1637 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1638 if (workerNodeKey
!== -1) {
1639 this.workerNodes
.splice(workerNodeKey
, 1)
1640 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1644 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1645 this.getWorkerInfo(workerNodeKey
).ready
= false
1649 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1651 this.opts
.enableTasksQueue
=== true &&
1652 this.workerNodes
[workerNodeKey
].hasBackPressure()
1656 private hasBackPressure (): boolean {
1658 this.opts
.enableTasksQueue
=== true &&
1659 this.workerNodes
.findIndex(
1660 workerNode
=> !workerNode
.hasBackPressure()
1666 * Executes the given task on the worker given its worker node key.
1668 * @param workerNodeKey - The worker node key.
1669 * @param task - The task to execute.
1671 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1672 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1673 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1674 this.checkAndEmitTaskExecutionEvents()
1677 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1678 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1679 this.checkAndEmitTaskQueuingEvents()
1680 return tasksQueueSize
1683 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1684 return this.workerNodes
[workerNodeKey
].dequeueTask()
1687 private tasksQueueSize (workerNodeKey
: number): number {
1688 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1691 protected flushTasksQueue (workerNodeKey
: number): void {
1692 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1695 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1698 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1701 private flushTasksQueues (): void {
1702 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1703 this.flushTasksQueue(workerNodeKey
)