1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
23 import { KillBehaviors
} from
'../worker/worker-options'
24 import type { TaskFunction
} from
'../worker/task-functions'
32 type TasksQueueOptions
38 WorkerNodeEventDetail
,
43 type MeasurementStatisticsRequirements
,
45 WorkerChoiceStrategies
,
46 type WorkerChoiceStrategy
,
47 type WorkerChoiceStrategyOptions
48 } from
'./selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
50 import { version
} from
'./version'
51 import { WorkerNode
} from
'./worker-node'
54 checkValidTasksQueueOptions
,
55 checkValidWorkerChoiceStrategy
,
56 updateMeasurementStatistics
60 * Base class that implements some shared logic for all poolifier pools.
62 * @typeParam Worker - Type of worker which manages this pool.
63 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
64 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
66 export abstract class AbstractPool
<
67 Worker
extends IWorker
,
70 > implements IPool
<Worker
, Data
, Response
> {
72 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
75 public emitter
?: EventEmitterAsyncResource
78 * Dynamic pool maximum size property placeholder.
80 protected readonly max
?: number
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
90 new Map
<string, PromiseResponseWrapper
<Response
>>()
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
106 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
109 * Whether the pool is started or not.
111 private started
: boolean
113 * Whether the pool is starting or not.
115 private starting
: boolean
117 * Whether the pool is destroying or not.
119 private destroying
: boolean
121 * The start timestamp of the pool.
123 private readonly startTimestamp
126 * Constructs a new poolifier pool.
128 * @param numberOfWorkers - Number of workers that this pool should manage.
129 * @param filePath - Path to the worker file.
130 * @param opts - Options for the pool.
133 protected readonly numberOfWorkers
: number,
134 protected readonly filePath
: string,
135 protected readonly opts
: PoolOptions
<Worker
>
137 if (!this.isMain()) {
139 'Cannot start a pool from a worker with the same type as the pool'
142 checkFilePath(this.filePath
)
143 this.checkNumberOfWorkers(this.numberOfWorkers
)
144 this.checkPoolOptions(this.opts
)
146 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
147 this.executeTask
= this.executeTask
.bind(this)
148 this.enqueueTask
= this.enqueueTask
.bind(this)
150 if (this.opts
.enableEvents
=== true) {
151 this.initializeEventEmitter()
153 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
159 this.opts
.workerChoiceStrategy
,
160 this.opts
.workerChoiceStrategyOptions
165 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
168 this.starting
= false
169 this.destroying
= false
170 if (this.opts
.startWorkers
=== true) {
174 this.startTimestamp
= performance
.now()
177 private checkNumberOfWorkers (numberOfWorkers
: number): void {
178 if (numberOfWorkers
== null) {
180 'Cannot instantiate a pool without specifying the number of workers'
182 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
184 'Cannot instantiate a pool with a non safe integer number of workers'
186 } else if (numberOfWorkers
< 0) {
187 throw new RangeError(
188 'Cannot instantiate a pool with a negative number of workers'
190 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
191 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
195 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
196 if (isPlainObject(opts
)) {
197 this.opts
.startWorkers
= opts
.startWorkers
?? true
198 checkValidWorkerChoiceStrategy(
199 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
201 this.opts
.workerChoiceStrategy
=
202 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategyOptions(
204 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
206 this.opts
.workerChoiceStrategyOptions
= {
207 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
208 ...opts
.workerChoiceStrategyOptions
210 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
211 this.opts
.enableEvents
= opts
.enableEvents
?? true
212 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
213 if (this.opts
.enableTasksQueue
) {
214 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
215 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
216 opts
.tasksQueueOptions
as TasksQueueOptions
220 throw new TypeError('Invalid pool options: must be a plain object')
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
228 workerChoiceStrategyOptions
!= null &&
229 !isPlainObject(workerChoiceStrategyOptions
)
232 'Invalid worker choice strategy options: must be a plain object'
236 workerChoiceStrategyOptions
?.retries
!= null &&
237 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
240 'Invalid worker choice strategy options: retries must be an integer'
244 workerChoiceStrategyOptions
?.retries
!= null &&
245 workerChoiceStrategyOptions
.retries
< 0
247 throw new RangeError(
248 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
252 workerChoiceStrategyOptions
?.weights
!= null &&
253 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
256 'Invalid worker choice strategy options: must have a weight for each worker node'
260 workerChoiceStrategyOptions
?.measurement
!= null &&
261 !Object.values(Measurements
).includes(
262 workerChoiceStrategyOptions
.measurement
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
271 private initializeEventEmitter (): void {
272 this.emitter
= new EventEmitterAsyncResource({
273 name
: `poolifier:${this.type}-${this.worker}-pool`
278 public get
info (): PoolInfo
{
283 started
: this.started
,
285 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
286 minSize
: this.minSize
,
287 maxSize
: this.maxSize
,
288 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
289 .runTime
.aggregate
&&
290 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
291 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
292 workerNodes
: this.workerNodes
.length
,
293 idleWorkerNodes
: this.workerNodes
.reduce(
294 (accumulator
, workerNode
) =>
295 workerNode
.usage
.tasks
.executing
=== 0
300 busyWorkerNodes
: this.workerNodes
.reduce(
301 (accumulator
, workerNode
) =>
302 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
305 executedTasks
: this.workerNodes
.reduce(
306 (accumulator
, workerNode
) =>
307 accumulator
+ workerNode
.usage
.tasks
.executed
,
310 executingTasks
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 accumulator
+ workerNode
.usage
.tasks
.executing
,
315 ...(this.opts
.enableTasksQueue
=== true && {
316 queuedTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ workerNode
.usage
.tasks
.queued
,
322 ...(this.opts
.enableTasksQueue
=== true && {
323 maxQueuedTasks
: this.workerNodes
.reduce(
324 (accumulator
, workerNode
) =>
325 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
329 ...(this.opts
.enableTasksQueue
=== true && {
330 backPressure
: this.hasBackPressure()
332 ...(this.opts
.enableTasksQueue
=== true && {
333 stolenTasks
: this.workerNodes
.reduce(
334 (accumulator
, workerNode
) =>
335 accumulator
+ workerNode
.usage
.tasks
.stolen
,
339 failedTasks
: this.workerNodes
.reduce(
340 (accumulator
, workerNode
) =>
341 accumulator
+ workerNode
.usage
.tasks
.failed
,
344 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
345 .runTime
.aggregate
&& {
349 ...this.workerNodes
.map(
350 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
356 ...this.workerNodes
.map(
357 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
361 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
362 .runTime
.average
&& {
365 this.workerNodes
.reduce
<number[]>(
366 (accumulator
, workerNode
) =>
367 accumulator
.concat(workerNode
.usage
.runTime
.history
),
373 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
377 this.workerNodes
.reduce
<number[]>(
378 (accumulator
, workerNode
) =>
379 accumulator
.concat(workerNode
.usage
.runTime
.history
),
387 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
388 .waitTime
.aggregate
&& {
392 ...this.workerNodes
.map(
393 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
399 ...this.workerNodes
.map(
400 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
404 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
405 .waitTime
.average
&& {
408 this.workerNodes
.reduce
<number[]>(
409 (accumulator
, workerNode
) =>
410 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
416 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
417 .waitTime
.median
&& {
420 this.workerNodes
.reduce
<number[]>(
421 (accumulator
, workerNode
) =>
422 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
434 * The pool readiness boolean status.
436 private get
ready (): boolean {
438 this.workerNodes
.reduce(
439 (accumulator
, workerNode
) =>
440 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
449 * The approximate pool utilization.
451 * @returns The pool utilization.
453 private get
utilization (): number {
454 const poolTimeCapacity
=
455 (performance
.now() - this.startTimestamp
) * this.maxSize
456 const totalTasksRunTime
= this.workerNodes
.reduce(
457 (accumulator
, workerNode
) =>
458 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
461 const totalTasksWaitTime
= this.workerNodes
.reduce(
462 (accumulator
, workerNode
) =>
463 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
466 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
472 * If it is `'dynamic'`, it provides the `max` property.
474 protected abstract get
type (): PoolType
479 protected abstract get
worker (): WorkerType
482 * The pool minimum size.
484 protected get
minSize (): number {
485 return this.numberOfWorkers
489 * The pool maximum size.
491 protected get
maxSize (): number {
492 return this.max
?? this.numberOfWorkers
496 * Checks if the worker id sent in the received message from a worker is valid.
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
502 if (message
.workerId
== null) {
503 throw new Error('Worker message received without worker id')
504 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
506 `Worker message received from unknown worker '${message.workerId}'`
512 * Gets the given worker its worker node key.
514 * @param worker - The worker.
515 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
517 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
518 return this.workerNodes
.findIndex(
519 workerNode
=> workerNode
.worker
=== worker
524 * Gets the worker node key given its worker id.
526 * @param workerId - The worker id.
527 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
529 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
530 return this.workerNodes
.findIndex(
531 workerNode
=> workerNode
.info
.id
=== workerId
536 public setWorkerChoiceStrategy (
537 workerChoiceStrategy
: WorkerChoiceStrategy
,
538 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
540 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
541 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
542 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
543 this.opts
.workerChoiceStrategy
545 if (workerChoiceStrategyOptions
!= null) {
546 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
548 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
549 workerNode
.resetUsage()
550 this.sendStatisticsMessageToWorker(workerNodeKey
)
555 public setWorkerChoiceStrategyOptions (
556 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
558 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
559 this.opts
.workerChoiceStrategyOptions
= {
560 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
561 ...workerChoiceStrategyOptions
563 this.workerChoiceStrategyContext
.setOptions(
564 this.opts
.workerChoiceStrategyOptions
569 public enableTasksQueue (
571 tasksQueueOptions
?: TasksQueueOptions
573 if (this.opts
.enableTasksQueue
=== true && !enable
) {
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
576 this.flushTasksQueues()
578 this.opts
.enableTasksQueue
= enable
579 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
583 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
584 if (this.opts
.enableTasksQueue
=== true) {
585 checkValidTasksQueueOptions(tasksQueueOptions
)
586 this.opts
.tasksQueueOptions
=
587 this.buildTasksQueueOptions(tasksQueueOptions
)
588 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
589 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
590 this.setTaskStealing()
592 this.unsetTaskStealing()
594 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
595 this.setTasksStealingOnBackPressure()
597 this.unsetTasksStealingOnBackPressure()
599 } else if (this.opts
.tasksQueueOptions
!= null) {
600 delete this.opts
.tasksQueueOptions
604 private buildTasksQueueOptions (
605 tasksQueueOptions
: TasksQueueOptions
606 ): TasksQueueOptions
{
609 size
: Math.pow(this.maxSize
, 2),
612 tasksStealingOnBackPressure
: true
618 private setTasksQueueSize (size
: number): void {
619 for (const workerNode
of this.workerNodes
) {
620 workerNode
.tasksQueueBackPressureSize
= size
624 private setTaskStealing (): void {
625 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
626 this.workerNodes
[workerNodeKey
].addEventListener(
628 this.handleEmptyQueueEvent
as EventListener
633 private unsetTaskStealing (): void {
634 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
635 this.workerNodes
[workerNodeKey
].removeEventListener(
637 this.handleEmptyQueueEvent
as EventListener
642 private setTasksStealingOnBackPressure (): void {
643 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
644 this.workerNodes
[workerNodeKey
].addEventListener(
646 this.handleBackPressureEvent
as EventListener
651 private unsetTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
653 this.workerNodes
[workerNodeKey
].removeEventListener(
655 this.handleBackPressureEvent
as EventListener
661 * Whether the pool is full or not.
663 * The pool filling boolean status.
665 protected get
full (): boolean {
666 return this.workerNodes
.length
>= this.maxSize
670 * Whether the pool is busy or not.
672 * The pool busyness boolean status.
674 protected abstract get
busy (): boolean
677 * Whether worker nodes are executing concurrently their tasks quota or not.
679 * @returns Worker nodes busyness boolean status.
681 protected internalBusy (): boolean {
682 if (this.opts
.enableTasksQueue
=== true) {
684 this.workerNodes
.findIndex(
686 workerNode
.info
.ready
&&
687 workerNode
.usage
.tasks
.executing
<
688 (this.opts
.tasksQueueOptions
?.concurrency
as number)
693 this.workerNodes
.findIndex(
695 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
700 private async sendTaskFunctionOperationToWorker (
701 workerNodeKey
: number,
702 message
: MessageValue
<Data
>
703 ): Promise
<boolean> {
704 return await new Promise
<boolean>((resolve
, reject
) => {
705 const taskFunctionOperationListener
= (
706 message
: MessageValue
<Response
>
708 this.checkMessageWorkerId(message
)
709 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
711 message
.taskFunctionOperationStatus
!= null &&
712 message
.workerId
=== workerId
714 if (message
.taskFunctionOperationStatus
) {
716 } else if (!message
.taskFunctionOperationStatus
) {
719 `Task function operation '${
720 message.taskFunctionOperation as string
721 }' failed on worker ${message.workerId} with error: '${
722 message.workerError?.message as string
727 this.deregisterWorkerMessageListener(
728 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
729 taskFunctionOperationListener
733 this.registerWorkerMessageListener(
735 taskFunctionOperationListener
737 this.sendToWorker(workerNodeKey
, message
)
741 private async sendTaskFunctionOperationToWorkers (
742 message
: MessageValue
<Data
>
743 ): Promise
<boolean> {
744 return await new Promise
<boolean>((resolve
, reject
) => {
745 const responsesReceived
= new Array<MessageValue
<Response
>>()
746 const taskFunctionOperationsListener
= (
747 message
: MessageValue
<Response
>
749 this.checkMessageWorkerId(message
)
750 if (message
.taskFunctionOperationStatus
!= null) {
751 responsesReceived
.push(message
)
752 if (responsesReceived
.length
=== this.workerNodes
.length
) {
754 responsesReceived
.every(
755 message
=> message
.taskFunctionOperationStatus
=== true
760 responsesReceived
.some(
761 message
=> message
.taskFunctionOperationStatus
=== false
764 const errorResponse
= responsesReceived
.find(
765 response
=> response
.taskFunctionOperationStatus
=== false
769 `Task function operation '${
770 message.taskFunctionOperation as string
771 }' failed on worker ${
772 errorResponse?.workerId as number
774 errorResponse?.workerError?.message as string
779 this.deregisterWorkerMessageListener(
780 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
781 taskFunctionOperationsListener
786 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
787 this.registerWorkerMessageListener(
789 taskFunctionOperationsListener
791 this.sendToWorker(workerNodeKey
, message
)
797 public hasTaskFunction (name
: string): boolean {
798 for (const workerNode
of this.workerNodes
) {
800 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
801 workerNode
.info
.taskFunctionNames
.includes(name
)
810 public async addTaskFunction (
812 fn
: TaskFunction
<Data
, Response
>
813 ): Promise
<boolean> {
814 if (typeof name
!== 'string') {
815 throw new TypeError('name argument must be a string')
817 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
818 throw new TypeError('name argument must not be an empty string')
820 if (typeof fn
!== 'function') {
821 throw new TypeError('fn argument must be a function')
823 const opResult
= await this.sendTaskFunctionOperationToWorkers({
824 taskFunctionOperation
: 'add',
825 taskFunctionName
: name
,
826 taskFunction
: fn
.toString()
828 this.taskFunctions
.set(name
, fn
)
833 public async removeTaskFunction (name
: string): Promise
<boolean> {
834 if (!this.taskFunctions
.has(name
)) {
836 'Cannot remove a task function not handled on the pool side'
839 const opResult
= await this.sendTaskFunctionOperationToWorkers({
840 taskFunctionOperation
: 'remove',
841 taskFunctionName
: name
843 this.deleteTaskFunctionWorkerUsages(name
)
844 this.taskFunctions
.delete(name
)
849 public listTaskFunctionNames (): string[] {
850 for (const workerNode
of this.workerNodes
) {
852 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
853 workerNode
.info
.taskFunctionNames
.length
> 0
855 return workerNode
.info
.taskFunctionNames
862 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
863 return await this.sendTaskFunctionOperationToWorkers({
864 taskFunctionOperation
: 'default',
865 taskFunctionName
: name
869 private deleteTaskFunctionWorkerUsages (name
: string): void {
870 for (const workerNode
of this.workerNodes
) {
871 workerNode
.deleteTaskFunctionWorkerUsage(name
)
875 private shallExecuteTask (workerNodeKey
: number): boolean {
877 this.tasksQueueSize(workerNodeKey
) === 0 &&
878 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
879 (this.opts
.tasksQueueOptions
?.concurrency
as number)
884 public async execute (
887 transferList
?: TransferListItem
[]
888 ): Promise
<Response
> {
889 return await new Promise
<Response
>((resolve
, reject
) => {
891 reject(new Error('Cannot execute a task on not started pool'))
894 if (this.destroying
) {
895 reject(new Error('Cannot execute a task on destroying pool'))
898 if (name
!= null && typeof name
!== 'string') {
899 reject(new TypeError('name argument must be a string'))
904 typeof name
=== 'string' &&
905 name
.trim().length
=== 0
907 reject(new TypeError('name argument must not be an empty string'))
910 if (transferList
!= null && !Array.isArray(transferList
)) {
911 reject(new TypeError('transferList argument must be an array'))
914 const timestamp
= performance
.now()
915 const workerNodeKey
= this.chooseWorkerNode()
916 const task
: Task
<Data
> = {
917 name
: name
?? DEFAULT_TASK_NAME
,
918 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
919 data
: data
?? ({} as Data
),
924 this.promiseResponseMap
.set(task
.taskId
as string, {
930 this.opts
.enableTasksQueue
=== false ||
931 (this.opts
.enableTasksQueue
=== true &&
932 this.shallExecuteTask(workerNodeKey
))
934 this.executeTask(workerNodeKey
, task
)
936 this.enqueueTask(workerNodeKey
, task
)
942 public start (): void {
944 throw new Error('Cannot start an already started pool')
947 throw new Error('Cannot start an already starting pool')
949 if (this.destroying
) {
950 throw new Error('Cannot start a destroying pool')
954 this.workerNodes
.reduce(
955 (accumulator
, workerNode
) =>
956 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
958 ) < this.numberOfWorkers
960 this.createAndSetupWorkerNode()
962 this.starting
= false
967 public async destroy (): Promise
<void> {
969 throw new Error('Cannot destroy an already destroyed pool')
972 throw new Error('Cannot destroy an starting pool')
974 if (this.destroying
) {
975 throw new Error('Cannot destroy an already destroying pool')
977 this.destroying
= true
979 this.workerNodes
.map(async (_
, workerNodeKey
) => {
980 await this.destroyWorkerNode(workerNodeKey
)
983 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
984 this.emitter
?.emitDestroy()
985 this.destroying
= false
989 protected async sendKillMessageToWorker (
990 workerNodeKey
: number
992 await new Promise
<void>((resolve
, reject
) => {
993 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
994 this.checkMessageWorkerId(message
)
995 if (message
.kill
=== 'success') {
997 } else if (message
.kill
=== 'failure') {
1000 `Kill message handling failed on worker ${
1001 message.workerId as number
1007 // FIXME: should be registered only once
1008 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1009 this.sendToWorker(workerNodeKey
, { kill
: true })
1014 * Terminates the worker node given its worker node key.
1016 * @param workerNodeKey - The worker node key.
1018 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1021 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1022 * Can be overridden.
1026 protected setupHook (): void {
1027 /* Intentionally empty */
1031 * Should return whether the worker is the main worker or not.
1033 protected abstract isMain (): boolean
1036 * Hook executed before the worker task execution.
1037 * Can be overridden.
1039 * @param workerNodeKey - The worker node key.
1040 * @param task - The task to execute.
1042 protected beforeTaskExecutionHook (
1043 workerNodeKey
: number,
1046 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1047 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1048 ++workerUsage
.tasks
.executing
1049 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1052 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1053 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1057 const taskFunctionWorkerUsage
= this.workerNodes
[
1059 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1060 ++taskFunctionWorkerUsage
.tasks
.executing
1061 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1066 * Hook executed after the worker task execution.
1067 * Can be overridden.
1069 * @param workerNodeKey - The worker node key.
1070 * @param message - The received message.
1072 protected afterTaskExecutionHook (
1073 workerNodeKey
: number,
1074 message
: MessageValue
<Response
>
1076 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1077 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1078 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1079 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1080 this.updateEluWorkerUsage(workerUsage
, message
)
1083 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1084 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1085 message
.taskPerformance
?.name
as string
1088 const taskFunctionWorkerUsage
= this.workerNodes
[
1090 ].getTaskFunctionWorkerUsage(
1091 message
.taskPerformance
?.name
as string
1093 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1094 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1095 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1100 * Whether the worker node shall update its task function worker usage or not.
1102 * @param workerNodeKey - The worker node key.
1103 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1105 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1106 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1108 workerInfo
!= null &&
1109 Array.isArray(workerInfo
.taskFunctionNames
) &&
1110 workerInfo
.taskFunctionNames
.length
> 2
1114 private updateTaskStatisticsWorkerUsage (
1115 workerUsage
: WorkerUsage
,
1116 message
: MessageValue
<Response
>
1118 const workerTaskStatistics
= workerUsage
.tasks
1120 workerTaskStatistics
.executing
!= null &&
1121 workerTaskStatistics
.executing
> 0
1123 --workerTaskStatistics
.executing
1125 if (message
.workerError
== null) {
1126 ++workerTaskStatistics
.executed
1128 ++workerTaskStatistics
.failed
1132 private updateRunTimeWorkerUsage (
1133 workerUsage
: WorkerUsage
,
1134 message
: MessageValue
<Response
>
1136 if (message
.workerError
!= null) {
1139 updateMeasurementStatistics(
1140 workerUsage
.runTime
,
1141 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1142 message
.taskPerformance
?.runTime
?? 0
1146 private updateWaitTimeWorkerUsage (
1147 workerUsage
: WorkerUsage
,
1150 const timestamp
= performance
.now()
1151 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1152 updateMeasurementStatistics(
1153 workerUsage
.waitTime
,
1154 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1159 private updateEluWorkerUsage (
1160 workerUsage
: WorkerUsage
,
1161 message
: MessageValue
<Response
>
1163 if (message
.workerError
!= null) {
1166 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1167 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1168 updateMeasurementStatistics(
1169 workerUsage
.elu
.active
,
1170 eluTaskStatisticsRequirements
,
1171 message
.taskPerformance
?.elu
?.active
?? 0
1173 updateMeasurementStatistics(
1174 workerUsage
.elu
.idle
,
1175 eluTaskStatisticsRequirements
,
1176 message
.taskPerformance
?.elu
?.idle
?? 0
1178 if (eluTaskStatisticsRequirements
.aggregate
) {
1179 if (message
.taskPerformance
?.elu
!= null) {
1180 if (workerUsage
.elu
.utilization
!= null) {
1181 workerUsage
.elu
.utilization
=
1182 (workerUsage
.elu
.utilization
+
1183 message
.taskPerformance
.elu
.utilization
) /
1186 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1193 * Chooses a worker node for the next task.
1195 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1197 * @returns The chosen worker node key
1199 private chooseWorkerNode (): number {
1200 if (this.shallCreateDynamicWorker()) {
1201 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1203 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1205 return workerNodeKey
1208 return this.workerChoiceStrategyContext
.execute()
1212 * Conditions for dynamic worker creation.
1214 * @returns Whether to create a dynamic worker or not.
1216 private shallCreateDynamicWorker (): boolean {
1217 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1221 * Sends a message to worker given its worker node key.
1223 * @param workerNodeKey - The worker node key.
1224 * @param message - The message.
1225 * @param transferList - The optional array of transferable objects.
1227 protected abstract sendToWorker (
1228 workerNodeKey
: number,
1229 message
: MessageValue
<Data
>,
1230 transferList
?: TransferListItem
[]
1234 * Creates a new worker.
1236 * @returns Newly created worker.
1238 protected abstract createWorker (): Worker
1241 * Creates a new, completely set up worker node.
1243 * @returns New, completely set up worker node key.
1245 protected createAndSetupWorkerNode (): number {
1246 const worker
= this.createWorker()
1248 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1249 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1250 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1251 worker
.on('error', error
=> {
1252 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1253 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1254 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1255 this.emitter
?.emit(PoolEvents
.error
, error
)
1256 this.workerNodes
[workerNodeKey
].closeChannel()
1261 this.opts
.restartWorkerOnError
=== true
1263 if (workerInfo
.dynamic
) {
1264 this.createAndSetupDynamicWorkerNode()
1266 this.createAndSetupWorkerNode()
1269 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1270 this.redistributeQueuedTasks(workerNodeKey
)
1273 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1274 worker
.once('exit', () => {
1275 this.removeWorkerNode(worker
)
1278 const workerNodeKey
= this.addWorkerNode(worker
)
1280 this.afterWorkerNodeSetup(workerNodeKey
)
1282 return workerNodeKey
1286 * Creates a new, completely set up dynamic worker node.
1288 * @returns New, completely set up dynamic worker node key.
1290 protected createAndSetupDynamicWorkerNode (): number {
1291 const workerNodeKey
= this.createAndSetupWorkerNode()
1292 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1293 this.checkMessageWorkerId(message
)
1294 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1297 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1298 // Kill message received from worker
1300 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1301 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1302 ((this.opts
.enableTasksQueue
=== false &&
1303 workerUsage
.tasks
.executing
=== 0) ||
1304 (this.opts
.enableTasksQueue
=== true &&
1305 workerUsage
.tasks
.executing
=== 0 &&
1306 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1308 // Flag the worker node as not ready immediately
1309 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1310 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1311 this.emitter
?.emit(PoolEvents
.error
, error
)
1315 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1316 this.sendToWorker(workerNodeKey
, {
1319 if (this.taskFunctions
.size
> 0) {
1320 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1321 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1322 taskFunctionOperation
: 'add',
1324 taskFunction
: taskFunction
.toString()
1326 this.emitter
?.emit(PoolEvents
.error
, error
)
1330 workerInfo
.dynamic
= true
1332 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1333 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1335 workerInfo
.ready
= true
1337 this.checkAndEmitDynamicWorkerCreationEvents()
1338 return workerNodeKey
1342 * Registers a listener callback on the worker given its worker node key.
1344 * @param workerNodeKey - The worker node key.
1345 * @param listener - The message listener callback.
1347 protected abstract registerWorkerMessageListener
<
1348 Message
extends Data
| Response
1350 workerNodeKey
: number,
1351 listener
: (message
: MessageValue
<Message
>) => void
1355 * Registers once a listener callback on the worker given its worker node key.
1357 * @param workerNodeKey - The worker node key.
1358 * @param listener - The message listener callback.
1360 protected abstract registerOnceWorkerMessageListener
<
1361 Message
extends Data
| Response
1363 workerNodeKey
: number,
1364 listener
: (message
: MessageValue
<Message
>) => void
1368 * Deregisters a listener callback on the worker given its worker node key.
1370 * @param workerNodeKey - The worker node key.
1371 * @param listener - The message listener callback.
1373 protected abstract deregisterWorkerMessageListener
<
1374 Message
extends Data
| Response
1376 workerNodeKey
: number,
1377 listener
: (message
: MessageValue
<Message
>) => void
1381 * Method hooked up after a worker node has been newly created.
1382 * Can be overridden.
1384 * @param workerNodeKey - The newly created worker node key.
1386 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1387 // Listen to worker messages.
1388 this.registerWorkerMessageListener(
1390 this.workerMessageListener
.bind(this)
1392 // Send the startup message to worker.
1393 this.sendStartupMessageToWorker(workerNodeKey
)
1394 // Send the statistics message to worker.
1395 this.sendStatisticsMessageToWorker(workerNodeKey
)
1396 if (this.opts
.enableTasksQueue
=== true) {
1397 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1398 this.workerNodes
[workerNodeKey
].addEventListener(
1400 this.handleEmptyQueueEvent
as EventListener
1403 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1404 this.workerNodes
[workerNodeKey
].addEventListener(
1406 this.handleBackPressureEvent
as EventListener
1413 * Sends the startup message to worker given its worker node key.
1415 * @param workerNodeKey - The worker node key.
1417 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1420 * Sends the statistics message to worker given its worker node key.
1422 * @param workerNodeKey - The worker node key.
1424 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1425 this.sendToWorker(workerNodeKey
, {
1428 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1430 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1436 private redistributeQueuedTasks (workerNodeKey
: number): void {
1437 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1438 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1439 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1440 return workerNode
.info
.ready
&&
1441 workerNode
.usage
.tasks
.queued
<
1442 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1448 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1449 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1450 this.executeTask(destinationWorkerNodeKey
, task
)
1452 this.enqueueTask(destinationWorkerNodeKey
, task
)
1457 private updateTaskStolenStatisticsWorkerUsage (
1458 workerNodeKey
: number,
1461 const workerNode
= this.workerNodes
[workerNodeKey
]
1462 if (workerNode
?.usage
!= null) {
1463 ++workerNode
.usage
.tasks
.stolen
1466 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1467 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1469 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1472 ++taskFunctionWorkerUsage
.tasks
.stolen
1476 private readonly handleEmptyQueueEvent
= (
1477 event
: CustomEvent
<WorkerNodeEventDetail
>
1479 const { workerId
} = event
.detail
1480 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1481 const workerNodes
= this.workerNodes
1484 (workerNodeA
, workerNodeB
) =>
1485 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1487 const sourceWorkerNode
= workerNodes
.find(
1489 workerNode
.info
.ready
&&
1490 workerNode
.info
.id
!== workerId
&&
1491 workerNode
.usage
.tasks
.queued
> 0
1493 if (sourceWorkerNode
!= null) {
1494 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1495 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1496 this.executeTask(destinationWorkerNodeKey
, task
)
1498 this.enqueueTask(destinationWorkerNodeKey
, task
)
1500 this.updateTaskStolenStatisticsWorkerUsage(
1501 destinationWorkerNodeKey
,
1507 private readonly handleBackPressureEvent
= (
1508 event
: CustomEvent
<WorkerNodeEventDetail
>
1510 const { workerId
} = event
.detail
1511 const sizeOffset
= 1
1512 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1515 const sourceWorkerNode
=
1516 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1517 const workerNodes
= this.workerNodes
1520 (workerNodeA
, workerNodeB
) =>
1521 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1523 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1525 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1526 workerNode
.info
.ready
&&
1527 workerNode
.info
.id
!== workerId
&&
1528 workerNode
.usage
.tasks
.queued
<
1529 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1531 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1532 if (this.shallExecuteTask(workerNodeKey
)) {
1533 this.executeTask(workerNodeKey
, task
)
1535 this.enqueueTask(workerNodeKey
, task
)
1537 this.updateTaskStolenStatisticsWorkerUsage(
1546 * This method is the message listener registered on each worker.
1548 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1549 this.checkMessageWorkerId(message
)
1550 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1551 // Worker ready response received from worker
1552 this.handleWorkerReadyResponse(message
)
1553 } else if (message
.taskId
!= null) {
1554 // Task execution response received from worker
1555 this.handleTaskExecutionResponse(message
)
1556 } else if (message
.taskFunctionNames
!= null) {
1557 // Task function names message received from worker
1559 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1560 ).taskFunctionNames
= message
.taskFunctionNames
1564 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1565 if (message
.ready
=== false) {
1567 `Worker ${message.workerId as number} failed to initialize`
1570 const workerInfo
= this.getWorkerInfo(
1571 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1573 workerInfo
.ready
= message
.ready
as boolean
1574 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1576 const emitPoolReadyEventOnce
= once(
1577 () => this.emitter
?.emit(PoolEvents
.ready
, this.info
),
1580 emitPoolReadyEventOnce()
1584 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1585 const { taskId
, workerError
, data
} = message
1586 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1587 if (promiseResponse
!= null) {
1588 const { resolve
, reject
, workerNodeKey
} = promiseResponse
1589 if (workerError
!= null) {
1590 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1591 reject(workerError
.message
)
1593 resolve(data
as Response
)
1595 this.afterTaskExecutionHook(workerNodeKey
, message
)
1596 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1597 this.promiseResponseMap
.delete(taskId
as string)
1599 this.opts
.enableTasksQueue
=== true &&
1600 this.tasksQueueSize(workerNodeKey
) > 0 &&
1601 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1602 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1606 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1612 private checkAndEmitTaskExecutionEvents (): void {
1614 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1618 private checkAndEmitTaskQueuingEvents (): void {
1619 if (this.hasBackPressure()) {
1620 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1624 private checkAndEmitDynamicWorkerCreationEvents (): void {
1625 if (this.type === PoolTypes
.dynamic
) {
1627 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1633 * Gets the worker information given its worker node key.
1635 * @param workerNodeKey - The worker node key.
1636 * @returns The worker information.
1638 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1639 return this.workerNodes
[workerNodeKey
]?.info
1643 * Adds the given worker in the pool worker nodes.
1645 * @param worker - The worker.
1646 * @returns The added worker node key.
1647 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1649 private addWorkerNode (worker
: Worker
): number {
1650 const workerNode
= new WorkerNode
<Worker
, Data
>(
1652 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1654 // Flag the worker node as ready at pool startup.
1655 if (this.starting
) {
1656 workerNode
.info
.ready
= true
1658 this.workerNodes
.push(workerNode
)
1659 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1660 if (workerNodeKey
=== -1) {
1661 throw new Error('Worker added not found in worker nodes')
1663 return workerNodeKey
1667 * Removes the given worker from the pool worker nodes.
1669 * @param worker - The worker.
1671 private removeWorkerNode (worker
: Worker
): void {
1672 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1673 if (workerNodeKey
!== -1) {
1674 this.workerNodes
.splice(workerNodeKey
, 1)
1675 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1679 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1680 this.getWorkerInfo(workerNodeKey
).ready
= false
1684 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1686 this.opts
.enableTasksQueue
=== true &&
1687 this.workerNodes
[workerNodeKey
].hasBackPressure()
1691 private hasBackPressure (): boolean {
1693 this.opts
.enableTasksQueue
=== true &&
1694 this.workerNodes
.findIndex(
1695 workerNode
=> !workerNode
.hasBackPressure()
1701 * Executes the given task on the worker given its worker node key.
1703 * @param workerNodeKey - The worker node key.
1704 * @param task - The task to execute.
1706 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1707 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1708 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1709 this.checkAndEmitTaskExecutionEvents()
1712 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1713 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1714 this.checkAndEmitTaskQueuingEvents()
1715 return tasksQueueSize
1718 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1719 return this.workerNodes
[workerNodeKey
].dequeueTask()
1722 private tasksQueueSize (workerNodeKey
: number): number {
1723 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1726 protected flushTasksQueue (workerNodeKey
: number): void {
1727 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1730 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1733 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1736 private flushTasksQueues (): void {
1737 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1738 this.flushTasksQueue(workerNodeKey
)