1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
23 import { KillBehaviors
} from
'../worker/worker-options'
24 import type { TaskFunction
} from
'../worker/task-functions'
32 type TasksQueueOptions
38 WorkerNodeEventDetail
,
43 type MeasurementStatisticsRequirements
,
45 WorkerChoiceStrategies
,
46 type WorkerChoiceStrategy
,
47 type WorkerChoiceStrategyOptions
48 } from
'./selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
50 import { version
} from
'./version'
51 import { WorkerNode
} from
'./worker-node'
54 checkValidTasksQueueOptions
,
55 checkValidWorkerChoiceStrategy
,
56 updateMeasurementStatistics
60 * Base class that implements some shared logic for all poolifier pools.
62 * @typeParam Worker - Type of worker which manages this pool.
63 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
64 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
66 export abstract class AbstractPool
<
67 Worker
extends IWorker
,
70 > implements IPool
<Worker
, Data
, Response
> {
72 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
75 public emitter
?: EventEmitterAsyncResource
78 * Dynamic pool maximum size property placeholder.
80 protected readonly max
?: number
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
90 new Map
<string, PromiseResponseWrapper
<Response
>>()
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
106 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
109 * Whether the pool is started or not.
111 private started
: boolean
113 * Whether the pool is starting or not.
115 private starting
: boolean
117 * The start timestamp of the pool.
119 private readonly startTimestamp
122 * Constructs a new poolifier pool.
124 * @param numberOfWorkers - Number of workers that this pool should manage.
125 * @param filePath - Path to the worker file.
126 * @param opts - Options for the pool.
129 protected readonly numberOfWorkers
: number,
130 protected readonly filePath
: string,
131 protected readonly opts
: PoolOptions
<Worker
>
133 if (!this.isMain()) {
135 'Cannot start a pool from a worker with the same type as the pool'
138 checkFilePath(this.filePath
)
139 this.checkNumberOfWorkers(this.numberOfWorkers
)
140 this.checkPoolOptions(this.opts
)
142 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
143 this.executeTask
= this.executeTask
.bind(this)
144 this.enqueueTask
= this.enqueueTask
.bind(this)
146 if (this.opts
.enableEvents
=== true) {
147 this.initializeEventEmitter()
149 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
155 this.opts
.workerChoiceStrategy
,
156 this.opts
.workerChoiceStrategyOptions
161 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
164 this.starting
= false
165 if (this.opts
.startWorkers
=== true) {
169 this.startTimestamp
= performance
.now()
172 private checkNumberOfWorkers (numberOfWorkers
: number): void {
173 if (numberOfWorkers
== null) {
175 'Cannot instantiate a pool without specifying the number of workers'
177 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
179 'Cannot instantiate a pool with a non safe integer number of workers'
181 } else if (numberOfWorkers
< 0) {
182 throw new RangeError(
183 'Cannot instantiate a pool with a negative number of workers'
185 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
186 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
190 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
191 if (isPlainObject(opts
)) {
192 this.opts
.startWorkers
= opts
.startWorkers
?? true
193 checkValidWorkerChoiceStrategy(
194 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
196 this.opts
.workerChoiceStrategy
=
197 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
198 this.checkValidWorkerChoiceStrategyOptions(
199 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
201 this.opts
.workerChoiceStrategyOptions
= {
202 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
203 ...opts
.workerChoiceStrategyOptions
205 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
206 this.opts
.enableEvents
= opts
.enableEvents
?? true
207 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
208 if (this.opts
.enableTasksQueue
) {
209 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
210 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
211 opts
.tasksQueueOptions
as TasksQueueOptions
215 throw new TypeError('Invalid pool options: must be a plain object')
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
223 workerChoiceStrategyOptions
!= null &&
224 !isPlainObject(workerChoiceStrategyOptions
)
227 'Invalid worker choice strategy options: must be a plain object'
231 workerChoiceStrategyOptions
?.retries
!= null &&
232 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
235 'Invalid worker choice strategy options: retries must be an integer'
239 workerChoiceStrategyOptions
?.retries
!= null &&
240 workerChoiceStrategyOptions
.retries
< 0
242 throw new RangeError(
243 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
247 workerChoiceStrategyOptions
?.weights
!= null &&
248 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
251 'Invalid worker choice strategy options: must have a weight for each worker node'
255 workerChoiceStrategyOptions
?.measurement
!= null &&
256 !Object.values(Measurements
).includes(
257 workerChoiceStrategyOptions
.measurement
261 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
266 private initializeEventEmitter (): void {
267 this.emitter
= new EventEmitterAsyncResource({
268 name
: `poolifier:${this.type}-${this.worker}-pool`
273 public get
info (): PoolInfo
{
278 started
: this.started
,
280 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
281 minSize
: this.minSize
,
282 maxSize
: this.maxSize
,
283 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
284 .runTime
.aggregate
&&
285 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
286 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
287 workerNodes
: this.workerNodes
.length
,
288 idleWorkerNodes
: this.workerNodes
.reduce(
289 (accumulator
, workerNode
) =>
290 workerNode
.usage
.tasks
.executing
=== 0
295 busyWorkerNodes
: this.workerNodes
.reduce(
296 (accumulator
, workerNode
) =>
297 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
300 executedTasks
: this.workerNodes
.reduce(
301 (accumulator
, workerNode
) =>
302 accumulator
+ workerNode
.usage
.tasks
.executed
,
305 executingTasks
: this.workerNodes
.reduce(
306 (accumulator
, workerNode
) =>
307 accumulator
+ workerNode
.usage
.tasks
.executing
,
310 ...(this.opts
.enableTasksQueue
=== true && {
311 queuedTasks
: this.workerNodes
.reduce(
312 (accumulator
, workerNode
) =>
313 accumulator
+ workerNode
.usage
.tasks
.queued
,
317 ...(this.opts
.enableTasksQueue
=== true && {
318 maxQueuedTasks
: this.workerNodes
.reduce(
319 (accumulator
, workerNode
) =>
320 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
324 ...(this.opts
.enableTasksQueue
=== true && {
325 backPressure
: this.hasBackPressure()
327 ...(this.opts
.enableTasksQueue
=== true && {
328 stolenTasks
: this.workerNodes
.reduce(
329 (accumulator
, workerNode
) =>
330 accumulator
+ workerNode
.usage
.tasks
.stolen
,
334 failedTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ workerNode
.usage
.tasks
.failed
,
339 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
340 .runTime
.aggregate
&& {
344 ...this.workerNodes
.map(
345 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
351 ...this.workerNodes
.map(
352 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
356 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
357 .runTime
.average
&& {
360 this.workerNodes
.reduce
<number[]>(
361 (accumulator
, workerNode
) =>
362 accumulator
.concat(workerNode
.usage
.runTime
.history
),
368 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
372 this.workerNodes
.reduce
<number[]>(
373 (accumulator
, workerNode
) =>
374 accumulator
.concat(workerNode
.usage
.runTime
.history
),
382 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
383 .waitTime
.aggregate
&& {
387 ...this.workerNodes
.map(
388 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
394 ...this.workerNodes
.map(
395 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
399 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
400 .waitTime
.average
&& {
403 this.workerNodes
.reduce
<number[]>(
404 (accumulator
, workerNode
) =>
405 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
411 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
412 .waitTime
.median
&& {
415 this.workerNodes
.reduce
<number[]>(
416 (accumulator
, workerNode
) =>
417 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
429 * The pool readiness boolean status.
431 private get
ready (): boolean {
433 this.workerNodes
.reduce(
434 (accumulator
, workerNode
) =>
435 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
444 * The approximate pool utilization.
446 * @returns The pool utilization.
448 private get
utilization (): number {
449 const poolTimeCapacity
=
450 (performance
.now() - this.startTimestamp
) * this.maxSize
451 const totalTasksRunTime
= this.workerNodes
.reduce(
452 (accumulator
, workerNode
) =>
453 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
456 const totalTasksWaitTime
= this.workerNodes
.reduce(
457 (accumulator
, workerNode
) =>
458 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
461 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
467 * If it is `'dynamic'`, it provides the `max` property.
469 protected abstract get
type (): PoolType
474 protected abstract get
worker (): WorkerType
477 * The pool minimum size.
479 protected get
minSize (): number {
480 return this.numberOfWorkers
484 * The pool maximum size.
486 protected get
maxSize (): number {
487 return this.max
?? this.numberOfWorkers
491 * Checks if the worker id sent in the received message from a worker is valid.
493 * @param message - The received message.
494 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
496 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
497 if (message
.workerId
== null) {
498 throw new Error('Worker message received without worker id')
499 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
501 `Worker message received from unknown worker '${message.workerId}'`
507 * Gets the given worker its worker node key.
509 * @param worker - The worker.
510 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
512 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
513 return this.workerNodes
.findIndex(
514 workerNode
=> workerNode
.worker
=== worker
519 * Gets the worker node key given its worker id.
521 * @param workerId - The worker id.
522 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
524 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
525 return this.workerNodes
.findIndex(
526 workerNode
=> workerNode
.info
.id
=== workerId
531 public setWorkerChoiceStrategy (
532 workerChoiceStrategy
: WorkerChoiceStrategy
,
533 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
535 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
536 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
537 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
538 this.opts
.workerChoiceStrategy
540 if (workerChoiceStrategyOptions
!= null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
543 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
544 workerNode
.resetUsage()
545 this.sendStatisticsMessageToWorker(workerNodeKey
)
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
554 this.opts
.workerChoiceStrategyOptions
= {
555 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
556 ...workerChoiceStrategyOptions
558 this.workerChoiceStrategyContext
.setOptions(
559 this.opts
.workerChoiceStrategyOptions
564 public enableTasksQueue (
566 tasksQueueOptions
?: TasksQueueOptions
568 if (this.opts
.enableTasksQueue
=== true && !enable
) {
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
571 this.flushTasksQueues()
573 this.opts
.enableTasksQueue
= enable
574 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
578 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
579 if (this.opts
.enableTasksQueue
=== true) {
580 checkValidTasksQueueOptions(tasksQueueOptions
)
581 this.opts
.tasksQueueOptions
=
582 this.buildTasksQueueOptions(tasksQueueOptions
)
583 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
584 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
585 this.setTaskStealing()
587 this.unsetTaskStealing()
589 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
590 this.setTasksStealingOnBackPressure()
592 this.unsetTasksStealingOnBackPressure()
594 } else if (this.opts
.tasksQueueOptions
!= null) {
595 delete this.opts
.tasksQueueOptions
599 private buildTasksQueueOptions (
600 tasksQueueOptions
: TasksQueueOptions
601 ): TasksQueueOptions
{
604 size
: Math.pow(this.maxSize
, 2),
607 tasksStealingOnBackPressure
: true
613 private setTasksQueueSize (size
: number): void {
614 for (const workerNode
of this.workerNodes
) {
615 workerNode
.tasksQueueBackPressureSize
= size
619 private setTaskStealing (): void {
620 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
621 this.workerNodes
[workerNodeKey
].addEventListener(
623 this.handleEmptyQueueEvent
as EventListener
628 private unsetTaskStealing (): void {
629 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
630 this.workerNodes
[workerNodeKey
].removeEventListener(
632 this.handleEmptyQueueEvent
as EventListener
637 private setTasksStealingOnBackPressure (): void {
638 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
639 this.workerNodes
[workerNodeKey
].addEventListener(
641 this.handleBackPressureEvent
as EventListener
646 private unsetTasksStealingOnBackPressure (): void {
647 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
648 this.workerNodes
[workerNodeKey
].removeEventListener(
650 this.handleBackPressureEvent
as EventListener
656 * Whether the pool is full or not.
658 * The pool filling boolean status.
660 protected get
full (): boolean {
661 return this.workerNodes
.length
>= this.maxSize
665 * Whether the pool is busy or not.
667 * The pool busyness boolean status.
669 protected abstract get
busy (): boolean
672 * Whether worker nodes are executing concurrently their tasks quota or not.
674 * @returns Worker nodes busyness boolean status.
676 protected internalBusy (): boolean {
677 if (this.opts
.enableTasksQueue
=== true) {
679 this.workerNodes
.findIndex(
681 workerNode
.info
.ready
&&
682 workerNode
.usage
.tasks
.executing
<
683 (this.opts
.tasksQueueOptions
?.concurrency
as number)
688 this.workerNodes
.findIndex(
690 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
695 private async sendTaskFunctionOperationToWorker (
696 workerNodeKey
: number,
697 message
: MessageValue
<Data
>
698 ): Promise
<boolean> {
699 return await new Promise
<boolean>((resolve
, reject
) => {
700 const taskFunctionOperationListener
= (
701 message
: MessageValue
<Response
>
703 this.checkMessageWorkerId(message
)
704 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
706 message
.taskFunctionOperationStatus
!= null &&
707 message
.workerId
=== workerId
709 if (message
.taskFunctionOperationStatus
) {
711 } else if (!message
.taskFunctionOperationStatus
) {
714 `Task function operation '${
715 message.taskFunctionOperation as string
716 }' failed on worker ${message.workerId} with error: '${
717 message.workerError?.message as string
722 this.deregisterWorkerMessageListener(
723 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
724 taskFunctionOperationListener
728 this.registerWorkerMessageListener(
730 taskFunctionOperationListener
732 this.sendToWorker(workerNodeKey
, message
)
736 private async sendTaskFunctionOperationToWorkers (
737 message
: MessageValue
<Data
>
738 ): Promise
<boolean> {
739 return await new Promise
<boolean>((resolve
, reject
) => {
740 const responsesReceived
= new Array<MessageValue
<Response
>>()
741 const taskFunctionOperationsListener
= (
742 message
: MessageValue
<Response
>
744 this.checkMessageWorkerId(message
)
745 if (message
.taskFunctionOperationStatus
!= null) {
746 responsesReceived
.push(message
)
747 if (responsesReceived
.length
=== this.workerNodes
.length
) {
749 responsesReceived
.every(
750 message
=> message
.taskFunctionOperationStatus
=== true
755 responsesReceived
.some(
756 message
=> message
.taskFunctionOperationStatus
=== false
759 const errorResponse
= responsesReceived
.find(
760 response
=> response
.taskFunctionOperationStatus
=== false
764 `Task function operation '${
765 message.taskFunctionOperation as string
766 }' failed on worker ${
767 errorResponse?.workerId as number
769 errorResponse?.workerError?.message as string
774 this.deregisterWorkerMessageListener(
775 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
776 taskFunctionOperationsListener
781 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
782 this.registerWorkerMessageListener(
784 taskFunctionOperationsListener
786 this.sendToWorker(workerNodeKey
, message
)
792 public hasTaskFunction (name
: string): boolean {
793 for (const workerNode
of this.workerNodes
) {
795 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
796 workerNode
.info
.taskFunctionNames
.includes(name
)
805 public async addTaskFunction (
807 fn
: TaskFunction
<Data
, Response
>
808 ): Promise
<boolean> {
809 if (typeof name
!== 'string') {
810 throw new TypeError('name argument must be a string')
812 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
813 throw new TypeError('name argument must not be an empty string')
815 if (typeof fn
!== 'function') {
816 throw new TypeError('fn argument must be a function')
818 const opResult
= await this.sendTaskFunctionOperationToWorkers({
819 taskFunctionOperation
: 'add',
820 taskFunctionName
: name
,
821 taskFunction
: fn
.toString()
823 this.taskFunctions
.set(name
, fn
)
828 public async removeTaskFunction (name
: string): Promise
<boolean> {
829 if (!this.taskFunctions
.has(name
)) {
831 'Cannot remove a task function not handled on the pool side'
834 const opResult
= await this.sendTaskFunctionOperationToWorkers({
835 taskFunctionOperation
: 'remove',
836 taskFunctionName
: name
838 this.deleteTaskFunctionWorkerUsages(name
)
839 this.taskFunctions
.delete(name
)
844 public listTaskFunctionNames (): string[] {
845 for (const workerNode
of this.workerNodes
) {
847 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
848 workerNode
.info
.taskFunctionNames
.length
> 0
850 return workerNode
.info
.taskFunctionNames
857 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
858 return await this.sendTaskFunctionOperationToWorkers({
859 taskFunctionOperation
: 'default',
860 taskFunctionName
: name
864 private deleteTaskFunctionWorkerUsages (name
: string): void {
865 for (const workerNode
of this.workerNodes
) {
866 workerNode
.deleteTaskFunctionWorkerUsage(name
)
870 private shallExecuteTask (workerNodeKey
: number): boolean {
872 this.tasksQueueSize(workerNodeKey
) === 0 &&
873 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
874 (this.opts
.tasksQueueOptions
?.concurrency
as number)
879 public async execute (
882 transferList
?: TransferListItem
[]
883 ): Promise
<Response
> {
884 return await new Promise
<Response
>((resolve
, reject
) => {
886 reject(new Error('Cannot execute a task on not started pool'))
889 if (name
!= null && typeof name
!== 'string') {
890 reject(new TypeError('name argument must be a string'))
895 typeof name
=== 'string' &&
896 name
.trim().length
=== 0
898 reject(new TypeError('name argument must not be an empty string'))
901 if (transferList
!= null && !Array.isArray(transferList
)) {
902 reject(new TypeError('transferList argument must be an array'))
905 const timestamp
= performance
.now()
906 const workerNodeKey
= this.chooseWorkerNode()
907 const task
: Task
<Data
> = {
908 name
: name
?? DEFAULT_TASK_NAME
,
909 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
910 data
: data
?? ({} as Data
),
915 this.promiseResponseMap
.set(task
.taskId
as string, {
921 this.opts
.enableTasksQueue
=== false ||
922 (this.opts
.enableTasksQueue
=== true &&
923 this.shallExecuteTask(workerNodeKey
))
925 this.executeTask(workerNodeKey
, task
)
927 this.enqueueTask(workerNodeKey
, task
)
933 public start (): void {
936 this.workerNodes
.reduce(
937 (accumulator
, workerNode
) =>
938 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
940 ) < this.numberOfWorkers
942 this.createAndSetupWorkerNode()
944 this.starting
= false
949 public async destroy (): Promise
<void> {
951 this.workerNodes
.map(async (_
, workerNodeKey
) => {
952 await this.destroyWorkerNode(workerNodeKey
)
955 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
956 this.emitter
?.emitDestroy()
960 protected async sendKillMessageToWorker (
961 workerNodeKey
: number
963 await new Promise
<void>((resolve
, reject
) => {
964 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
965 this.checkMessageWorkerId(message
)
966 if (message
.kill
=== 'success') {
968 } else if (message
.kill
=== 'failure') {
971 `Kill message handling failed on worker ${
972 message.workerId as number
978 // FIXME: should be registered only once
979 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
980 this.sendToWorker(workerNodeKey
, { kill
: true })
985 * Terminates the worker node given its worker node key.
987 * @param workerNodeKey - The worker node key.
989 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
992 * Setup hook to execute code before worker nodes are created in the abstract constructor.
997 protected setupHook (): void {
998 /* Intentionally empty */
1002 * Should return whether the worker is the main worker or not.
1004 protected abstract isMain (): boolean
1007 * Hook executed before the worker task execution.
1008 * Can be overridden.
1010 * @param workerNodeKey - The worker node key.
1011 * @param task - The task to execute.
1013 protected beforeTaskExecutionHook (
1014 workerNodeKey
: number,
1017 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1018 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1019 ++workerUsage
.tasks
.executing
1020 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1023 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1024 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1028 const taskFunctionWorkerUsage
= this.workerNodes
[
1030 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1031 ++taskFunctionWorkerUsage
.tasks
.executing
1032 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1037 * Hook executed after the worker task execution.
1038 * Can be overridden.
1040 * @param workerNodeKey - The worker node key.
1041 * @param message - The received message.
1043 protected afterTaskExecutionHook (
1044 workerNodeKey
: number,
1045 message
: MessageValue
<Response
>
1047 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1048 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1049 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1050 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1051 this.updateEluWorkerUsage(workerUsage
, message
)
1054 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1055 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1056 message
.taskPerformance
?.name
as string
1059 const taskFunctionWorkerUsage
= this.workerNodes
[
1061 ].getTaskFunctionWorkerUsage(
1062 message
.taskPerformance
?.name
as string
1064 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1065 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1066 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1071 * Whether the worker node shall update its task function worker usage or not.
1073 * @param workerNodeKey - The worker node key.
1074 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1076 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1077 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1079 workerInfo
!= null &&
1080 Array.isArray(workerInfo
.taskFunctionNames
) &&
1081 workerInfo
.taskFunctionNames
.length
> 2
1085 private updateTaskStatisticsWorkerUsage (
1086 workerUsage
: WorkerUsage
,
1087 message
: MessageValue
<Response
>
1089 const workerTaskStatistics
= workerUsage
.tasks
1091 workerTaskStatistics
.executing
!= null &&
1092 workerTaskStatistics
.executing
> 0
1094 --workerTaskStatistics
.executing
1096 if (message
.workerError
== null) {
1097 ++workerTaskStatistics
.executed
1099 ++workerTaskStatistics
.failed
1103 private updateRunTimeWorkerUsage (
1104 workerUsage
: WorkerUsage
,
1105 message
: MessageValue
<Response
>
1107 if (message
.workerError
!= null) {
1110 updateMeasurementStatistics(
1111 workerUsage
.runTime
,
1112 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1113 message
.taskPerformance
?.runTime
?? 0
1117 private updateWaitTimeWorkerUsage (
1118 workerUsage
: WorkerUsage
,
1121 const timestamp
= performance
.now()
1122 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1123 updateMeasurementStatistics(
1124 workerUsage
.waitTime
,
1125 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1130 private updateEluWorkerUsage (
1131 workerUsage
: WorkerUsage
,
1132 message
: MessageValue
<Response
>
1134 if (message
.workerError
!= null) {
1137 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1138 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1139 updateMeasurementStatistics(
1140 workerUsage
.elu
.active
,
1141 eluTaskStatisticsRequirements
,
1142 message
.taskPerformance
?.elu
?.active
?? 0
1144 updateMeasurementStatistics(
1145 workerUsage
.elu
.idle
,
1146 eluTaskStatisticsRequirements
,
1147 message
.taskPerformance
?.elu
?.idle
?? 0
1149 if (eluTaskStatisticsRequirements
.aggregate
) {
1150 if (message
.taskPerformance
?.elu
!= null) {
1151 if (workerUsage
.elu
.utilization
!= null) {
1152 workerUsage
.elu
.utilization
=
1153 (workerUsage
.elu
.utilization
+
1154 message
.taskPerformance
.elu
.utilization
) /
1157 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1164 * Chooses a worker node for the next task.
1166 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1168 * @returns The chosen worker node key
1170 private chooseWorkerNode (): number {
1171 if (this.shallCreateDynamicWorker()) {
1172 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1174 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1176 return workerNodeKey
1179 return this.workerChoiceStrategyContext
.execute()
1183 * Conditions for dynamic worker creation.
1185 * @returns Whether to create a dynamic worker or not.
1187 private shallCreateDynamicWorker (): boolean {
1188 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1192 * Sends a message to worker given its worker node key.
1194 * @param workerNodeKey - The worker node key.
1195 * @param message - The message.
1196 * @param transferList - The optional array of transferable objects.
1198 protected abstract sendToWorker (
1199 workerNodeKey
: number,
1200 message
: MessageValue
<Data
>,
1201 transferList
?: TransferListItem
[]
1205 * Creates a new worker.
1207 * @returns Newly created worker.
1209 protected abstract createWorker (): Worker
1212 * Creates a new, completely set up worker node.
1214 * @returns New, completely set up worker node key.
1216 protected createAndSetupWorkerNode (): number {
1217 const worker
= this.createWorker()
1219 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1220 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1221 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1222 worker
.on('error', error
=> {
1223 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1224 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1225 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1226 this.emitter
?.emit(PoolEvents
.error
, error
)
1227 this.workerNodes
[workerNodeKey
].closeChannel()
1231 this.opts
.restartWorkerOnError
=== true
1233 if (workerInfo
.dynamic
) {
1234 this.createAndSetupDynamicWorkerNode()
1236 this.createAndSetupWorkerNode()
1239 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1240 this.redistributeQueuedTasks(workerNodeKey
)
1243 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1244 worker
.once('exit', () => {
1245 this.removeWorkerNode(worker
)
1248 const workerNodeKey
= this.addWorkerNode(worker
)
1250 this.afterWorkerNodeSetup(workerNodeKey
)
1252 return workerNodeKey
1256 * Creates a new, completely set up dynamic worker node.
1258 * @returns New, completely set up dynamic worker node key.
1260 protected createAndSetupDynamicWorkerNode (): number {
1261 const workerNodeKey
= this.createAndSetupWorkerNode()
1262 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1263 this.checkMessageWorkerId(message
)
1264 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1267 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1268 // Kill message received from worker
1270 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1271 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1272 ((this.opts
.enableTasksQueue
=== false &&
1273 workerUsage
.tasks
.executing
=== 0) ||
1274 (this.opts
.enableTasksQueue
=== true &&
1275 workerUsage
.tasks
.executing
=== 0 &&
1276 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1278 // Flag the worker node as not ready immediately
1279 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1280 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1281 this.emitter
?.emit(PoolEvents
.error
, error
)
1285 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1286 this.sendToWorker(workerNodeKey
, {
1289 if (this.taskFunctions
.size
> 0) {
1290 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1291 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1292 taskFunctionOperation
: 'add',
1294 taskFunction
: taskFunction
.toString()
1296 this.emitter
?.emit(PoolEvents
.error
, error
)
1300 workerInfo
.dynamic
= true
1302 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1303 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1305 workerInfo
.ready
= true
1307 this.checkAndEmitDynamicWorkerCreationEvents()
1308 return workerNodeKey
1312 * Registers a listener callback on the worker given its worker node key.
1314 * @param workerNodeKey - The worker node key.
1315 * @param listener - The message listener callback.
1317 protected abstract registerWorkerMessageListener
<
1318 Message
extends Data
| Response
1320 workerNodeKey
: number,
1321 listener
: (message
: MessageValue
<Message
>) => void
1325 * Registers once a listener callback on the worker given its worker node key.
1327 * @param workerNodeKey - The worker node key.
1328 * @param listener - The message listener callback.
1330 protected abstract registerOnceWorkerMessageListener
<
1331 Message
extends Data
| Response
1333 workerNodeKey
: number,
1334 listener
: (message
: MessageValue
<Message
>) => void
1338 * Deregisters a listener callback on the worker given its worker node key.
1340 * @param workerNodeKey - The worker node key.
1341 * @param listener - The message listener callback.
1343 protected abstract deregisterWorkerMessageListener
<
1344 Message
extends Data
| Response
1346 workerNodeKey
: number,
1347 listener
: (message
: MessageValue
<Message
>) => void
1351 * Method hooked up after a worker node has been newly created.
1352 * Can be overridden.
1354 * @param workerNodeKey - The newly created worker node key.
1356 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1357 // Listen to worker messages.
1358 this.registerWorkerMessageListener(
1360 this.workerMessageListener
.bind(this)
1362 // Send the startup message to worker.
1363 this.sendStartupMessageToWorker(workerNodeKey
)
1364 // Send the statistics message to worker.
1365 this.sendStatisticsMessageToWorker(workerNodeKey
)
1366 if (this.opts
.enableTasksQueue
=== true) {
1367 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1368 this.workerNodes
[workerNodeKey
].addEventListener(
1370 this.handleEmptyQueueEvent
as EventListener
1373 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1374 this.workerNodes
[workerNodeKey
].addEventListener(
1376 this.handleBackPressureEvent
as EventListener
1383 * Sends the startup message to worker given its worker node key.
1385 * @param workerNodeKey - The worker node key.
1387 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1390 * Sends the statistics message to worker given its worker node key.
1392 * @param workerNodeKey - The worker node key.
1394 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1395 this.sendToWorker(workerNodeKey
, {
1398 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1400 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1406 private redistributeQueuedTasks (workerNodeKey
: number): void {
1407 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1408 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1409 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1410 return workerNode
.info
.ready
&&
1411 workerNode
.usage
.tasks
.queued
<
1412 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1418 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1419 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1420 this.executeTask(destinationWorkerNodeKey
, task
)
1422 this.enqueueTask(destinationWorkerNodeKey
, task
)
1427 private updateTaskStolenStatisticsWorkerUsage (
1428 workerNodeKey
: number,
1431 const workerNode
= this.workerNodes
[workerNodeKey
]
1432 if (workerNode
?.usage
!= null) {
1433 ++workerNode
.usage
.tasks
.stolen
1436 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1437 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1439 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1442 ++taskFunctionWorkerUsage
.tasks
.stolen
1446 private readonly handleEmptyQueueEvent
= (
1447 event
: CustomEvent
<WorkerNodeEventDetail
>
1449 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1450 event
.detail
.workerId
1452 const workerNodes
= this.workerNodes
1455 (workerNodeA
, workerNodeB
) =>
1456 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1458 const sourceWorkerNode
= workerNodes
.find(
1460 workerNode
.info
.ready
&&
1461 workerNode
.info
.id
!== event
.detail
.workerId
&&
1462 workerNode
.usage
.tasks
.queued
> 0
1464 if (sourceWorkerNode
!= null) {
1465 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1466 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1467 this.executeTask(destinationWorkerNodeKey
, task
)
1469 this.enqueueTask(destinationWorkerNodeKey
, task
)
1471 this.updateTaskStolenStatisticsWorkerUsage(
1472 destinationWorkerNodeKey
,
1478 private readonly handleBackPressureEvent
= (
1479 event
: CustomEvent
<WorkerNodeEventDetail
>
1481 const sizeOffset
= 1
1482 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1485 const sourceWorkerNode
=
1486 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(event
.detail
.workerId
)]
1487 const workerNodes
= this.workerNodes
1490 (workerNodeA
, workerNodeB
) =>
1491 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1493 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1495 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1496 workerNode
.info
.ready
&&
1497 workerNode
.info
.id
!== event
.detail
.workerId
&&
1498 workerNode
.usage
.tasks
.queued
<
1499 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1501 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1502 if (this.shallExecuteTask(workerNodeKey
)) {
1503 this.executeTask(workerNodeKey
, task
)
1505 this.enqueueTask(workerNodeKey
, task
)
1507 this.updateTaskStolenStatisticsWorkerUsage(
1516 * This method is the message listener registered on each worker.
1518 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1519 this.checkMessageWorkerId(message
)
1520 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1521 // Worker ready response received from worker
1522 this.handleWorkerReadyResponse(message
)
1523 } else if (message
.taskId
!= null) {
1524 // Task execution response received from worker
1525 this.handleTaskExecutionResponse(message
)
1526 } else if (message
.taskFunctionNames
!= null) {
1527 // Task function names message received from worker
1529 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1530 ).taskFunctionNames
= message
.taskFunctionNames
1534 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1535 if (message
.ready
=== false) {
1537 `Worker ${message.workerId as number} failed to initialize`
1540 const workerInfo
= this.getWorkerInfo(
1541 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1543 workerInfo
.ready
= message
.ready
as boolean
1544 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1546 const emitPoolReadyEventOnce
= once(
1547 () => this.emitter
?.emit(PoolEvents
.ready
, this.info
),
1550 emitPoolReadyEventOnce()
1554 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1555 const { taskId
, workerError
, data
} = message
1556 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1557 if (promiseResponse
!= null) {
1558 if (workerError
!= null) {
1559 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1560 promiseResponse
.reject(workerError
.message
)
1562 promiseResponse
.resolve(data
as Response
)
1564 const workerNodeKey
= promiseResponse
.workerNodeKey
1565 this.afterTaskExecutionHook(workerNodeKey
, message
)
1566 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1567 this.promiseResponseMap
.delete(taskId
as string)
1569 this.opts
.enableTasksQueue
=== true &&
1570 this.tasksQueueSize(workerNodeKey
) > 0 &&
1571 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1572 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1576 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1582 private checkAndEmitTaskExecutionEvents (): void {
1584 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1588 private checkAndEmitTaskQueuingEvents (): void {
1589 if (this.hasBackPressure()) {
1590 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1594 private checkAndEmitDynamicWorkerCreationEvents (): void {
1595 if (this.type === PoolTypes
.dynamic
) {
1597 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1603 * Gets the worker information given its worker node key.
1605 * @param workerNodeKey - The worker node key.
1606 * @returns The worker information.
1608 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1609 return this.workerNodes
[workerNodeKey
]?.info
1613 * Adds the given worker in the pool worker nodes.
1615 * @param worker - The worker.
1616 * @returns The added worker node key.
1617 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1619 private addWorkerNode (worker
: Worker
): number {
1620 const workerNode
= new WorkerNode
<Worker
, Data
>(
1622 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1624 // Flag the worker node as ready at pool startup.
1625 if (this.starting
) {
1626 workerNode
.info
.ready
= true
1628 this.workerNodes
.push(workerNode
)
1629 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1630 if (workerNodeKey
=== -1) {
1631 throw new Error('Worker added not found in worker nodes')
1633 return workerNodeKey
1637 * Removes the given worker from the pool worker nodes.
1639 * @param worker - The worker.
1641 private removeWorkerNode (worker
: Worker
): void {
1642 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1643 if (workerNodeKey
!== -1) {
1644 this.workerNodes
.splice(workerNodeKey
, 1)
1645 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1649 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1650 this.getWorkerInfo(workerNodeKey
).ready
= false
1654 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1656 this.opts
.enableTasksQueue
=== true &&
1657 this.workerNodes
[workerNodeKey
].hasBackPressure()
1661 private hasBackPressure (): boolean {
1663 this.opts
.enableTasksQueue
=== true &&
1664 this.workerNodes
.findIndex(
1665 workerNode
=> !workerNode
.hasBackPressure()
1671 * Executes the given task on the worker given its worker node key.
1673 * @param workerNodeKey - The worker node key.
1674 * @param task - The task to execute.
1676 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1677 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1678 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1679 this.checkAndEmitTaskExecutionEvents()
1682 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1683 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1684 this.checkAndEmitTaskQueuingEvents()
1685 return tasksQueueSize
1688 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1689 return this.workerNodes
[workerNodeKey
].dequeueTask()
1692 private tasksQueueSize (workerNodeKey
: number): number {
1693 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1696 protected flushTasksQueue (workerNodeKey
: number): void {
1697 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1700 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1703 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1706 private flushTasksQueues (): void {
1707 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1708 this.flushTasksQueue(workerNodeKey
)