1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 TaskFunctionProperties
12 } from
'../utility-types.js'
15 buildTaskFunctionProperties
,
30 } from
'../worker/task-functions.js'
31 import { KillBehaviors
} from
'../worker/worker-options.js'
39 type TasksQueueOptions
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types.js'
47 import { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
51 checkValidTasksQueueOptions
,
52 checkValidWorkerChoiceStrategy
,
53 getDefaultTasksQueueOptions
,
55 updateRunTimeWorkerUsage
,
56 updateTaskStatisticsWorkerUsage
,
57 updateWaitTimeWorkerUsage
,
60 import { version
} from
'./version.js'
65 WorkerNodeEventDetail
,
68 import { WorkerNode
} from
'./worker-node.js'
71 * Base class that implements some shared logic for all poolifier pools.
73 * @typeParam Worker - Type of worker which manages this pool.
74 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
75 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
77 export abstract class AbstractPool
<
78 Worker
extends IWorker
,
81 > implements IPool
<Worker
, Data
, Response
> {
83 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
86 public emitter
?: EventEmitterAsyncResource
89 * The task execution response promise map:
90 * - `key`: The message id of each submitted task.
91 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
93 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
95 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
96 new Map
<string, PromiseResponseWrapper
<Response
>>()
99 * Worker choice strategies context referencing worker choice algorithms implementation.
101 protected workerChoiceStrategiesContext
?: WorkerChoiceStrategiesContext
<
108 * The task functions added at runtime map:
109 * - `key`: The task function name.
110 * - `value`: The task function object.
112 private readonly taskFunctions
: Map
<
114 TaskFunctionObject
<Data
, Response
>
118 * Whether the pool is started or not.
120 private started
: boolean
122 * Whether the pool is starting or not.
124 private starting
: boolean
126 * Whether the pool is destroying or not.
128 private destroying
: boolean
130 * Whether the minimum number of workers is starting or not.
132 private startingMinimumNumberOfWorkers
: boolean
134 * Whether the pool ready event has been emitted or not.
136 private readyEventEmitted
: boolean
138 * The start timestamp of the pool.
140 private startTimestamp
?: number
143 * Constructs a new poolifier pool.
145 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
146 * @param filePath - Path to the worker file.
147 * @param opts - Options for the pool.
148 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
151 protected readonly minimumNumberOfWorkers
: number,
152 protected readonly filePath
: string,
153 protected readonly opts
: PoolOptions
<Worker
>,
154 protected readonly maximumNumberOfWorkers
?: number
156 if (!this.isMain()) {
158 'Cannot start a pool from a worker with the same type as the pool'
162 checkFilePath(this.filePath
)
163 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
164 this.checkPoolOptions(this.opts
)
166 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
167 this.executeTask
= this.executeTask
.bind(this)
168 this.enqueueTask
= this.enqueueTask
.bind(this)
170 if (this.opts
.enableEvents
=== true) {
171 this.initializeEventEmitter()
173 this.workerChoiceStrategiesContext
= new WorkerChoiceStrategiesContext
<
179 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
180 [this.opts
.workerChoiceStrategy
!],
181 this.opts
.workerChoiceStrategyOptions
186 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
189 this.starting
= false
190 this.destroying
= false
191 this.readyEventEmitted
= false
192 this.startingMinimumNumberOfWorkers
= false
193 if (this.opts
.startWorkers
=== true) {
198 private checkPoolType (): void {
199 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
201 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
206 private checkMinimumNumberOfWorkers (
207 minimumNumberOfWorkers
: number | undefined
209 if (minimumNumberOfWorkers
== null) {
211 'Cannot instantiate a pool without specifying the number of workers'
213 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
215 'Cannot instantiate a pool with a non safe integer number of workers'
217 } else if (minimumNumberOfWorkers
< 0) {
218 throw new RangeError(
219 'Cannot instantiate a pool with a negative number of workers'
221 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
222 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
226 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
227 if (isPlainObject(opts
)) {
228 this.opts
.startWorkers
= opts
.startWorkers
?? true
229 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
230 this.opts
.workerChoiceStrategy
=
231 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
232 this.checkValidWorkerChoiceStrategyOptions(
233 opts
.workerChoiceStrategyOptions
235 if (opts
.workerChoiceStrategyOptions
!= null) {
236 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
238 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
239 this.opts
.enableEvents
= opts
.enableEvents
?? true
240 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
241 if (this.opts
.enableTasksQueue
) {
242 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
243 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
244 opts
.tasksQueueOptions
248 throw new TypeError('Invalid pool options: must be a plain object')
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
256 workerChoiceStrategyOptions
!= null &&
257 !isPlainObject(workerChoiceStrategyOptions
)
260 'Invalid worker choice strategy options: must be a plain object'
264 workerChoiceStrategyOptions
?.weights
!= null &&
265 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
266 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
269 'Invalid worker choice strategy options: must have a weight for each worker node'
273 workerChoiceStrategyOptions
?.measurement
!= null &&
274 !Object.values(Measurements
).includes(
275 workerChoiceStrategyOptions
.measurement
279 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
284 private initializeEventEmitter (): void {
285 this.emitter
= new EventEmitterAsyncResource({
286 name
: `poolifier:${this.type}-${this.worker}-pool`
291 public get
info (): PoolInfo
{
296 started
: this.started
,
298 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
299 defaultStrategy
: this.opts
.workerChoiceStrategy
!,
300 strategyRetries
: this.workerChoiceStrategiesContext
?.retriesCount
?? 0,
301 minSize
: this.minimumNumberOfWorkers
,
302 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
303 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
304 .runTime
.aggregate
=== true &&
305 this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
306 .waitTime
.aggregate
&& {
307 utilization
: round(this.utilization
)
309 workerNodes
: this.workerNodes
.length
,
310 idleWorkerNodes
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 workerNode
.usage
.tasks
.executing
=== 0
317 ...(this.opts
.enableTasksQueue
=== true && {
318 stealingWorkerNodes
: this.workerNodes
.reduce(
319 (accumulator
, workerNode
) =>
320 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
324 busyWorkerNodes
: this.workerNodes
.reduce(
325 (accumulator
, _workerNode
, workerNodeKey
) =>
326 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
329 executedTasks
: this.workerNodes
.reduce(
330 (accumulator
, workerNode
) =>
331 accumulator
+ workerNode
.usage
.tasks
.executed
,
334 executingTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ workerNode
.usage
.tasks
.executing
,
339 ...(this.opts
.enableTasksQueue
=== true && {
340 queuedTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.queued
,
346 ...(this.opts
.enableTasksQueue
=== true && {
347 maxQueuedTasks
: this.workerNodes
.reduce(
348 (accumulator
, workerNode
) =>
349 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
353 ...(this.opts
.enableTasksQueue
=== true && {
354 backPressure
: this.hasBackPressure()
356 ...(this.opts
.enableTasksQueue
=== true && {
357 stolenTasks
: this.workerNodes
.reduce(
358 (accumulator
, workerNode
) =>
359 accumulator
+ workerNode
.usage
.tasks
.stolen
,
363 failedTasks
: this.workerNodes
.reduce(
364 (accumulator
, workerNode
) =>
365 accumulator
+ workerNode
.usage
.tasks
.failed
,
368 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
369 .runTime
.aggregate
=== true && {
373 ...this.workerNodes
.map(
374 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
380 ...this.workerNodes
.map(
381 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
385 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
386 .runTime
.average
&& {
389 this.workerNodes
.reduce
<number[]>(
390 (accumulator
, workerNode
) =>
391 accumulator
.concat(workerNode
.usage
.runTime
.history
),
397 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
401 this.workerNodes
.reduce
<number[]>(
402 (accumulator
, workerNode
) =>
403 accumulator
.concat(workerNode
.usage
.runTime
.history
),
411 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
412 .waitTime
.aggregate
=== true && {
416 ...this.workerNodes
.map(
417 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
423 ...this.workerNodes
.map(
424 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
428 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
429 .waitTime
.average
&& {
432 this.workerNodes
.reduce
<number[]>(
433 (accumulator
, workerNode
) =>
434 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
440 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
441 .waitTime
.median
&& {
444 this.workerNodes
.reduce
<number[]>(
445 (accumulator
, workerNode
) =>
446 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
458 * The pool readiness boolean status.
460 private get
ready (): boolean {
465 this.workerNodes
.reduce(
466 (accumulator
, workerNode
) =>
467 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
471 ) >= this.minimumNumberOfWorkers
476 * The pool emptiness boolean status.
478 protected get
empty (): boolean {
479 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
483 * The approximate pool utilization.
485 * @returns The pool utilization.
487 private get
utilization (): number {
488 if (this.startTimestamp
== null) {
491 const poolTimeCapacity
=
492 (performance
.now() - this.startTimestamp
) *
493 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
494 const totalTasksRunTime
= this.workerNodes
.reduce(
495 (accumulator
, workerNode
) =>
496 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
499 const totalTasksWaitTime
= this.workerNodes
.reduce(
500 (accumulator
, workerNode
) =>
501 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
504 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
510 * If it is `'dynamic'`, it provides the `max` property.
512 protected abstract get
type (): PoolType
517 protected abstract get
worker (): WorkerType
520 * Checks if the worker id sent in the received message from a worker is valid.
522 * @param message - The received message.
523 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
525 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
526 if (message
.workerId
== null) {
527 throw new Error('Worker message received without worker id')
528 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
530 `Worker message received from unknown worker '${message.workerId}'`
536 * Gets the worker node key given its worker id.
538 * @param workerId - The worker id.
539 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
541 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
542 return this.workerNodes
.findIndex(
543 workerNode
=> workerNode
.info
.id
=== workerId
548 public setWorkerChoiceStrategy (
549 workerChoiceStrategy
: WorkerChoiceStrategy
,
550 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
552 let requireSync
= false
553 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
554 if (workerChoiceStrategyOptions
!= null) {
555 requireSync
= !this.setWorkerChoiceStrategyOptions(
556 workerChoiceStrategyOptions
559 if (workerChoiceStrategy
!== this.opts
.workerChoiceStrategy
) {
560 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
561 this.workerChoiceStrategiesContext
?.setDefaultWorkerChoiceStrategy(
562 this.opts
.workerChoiceStrategy
,
563 this.opts
.workerChoiceStrategyOptions
568 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
569 this.getWorkerWorkerChoiceStrategies(),
570 this.opts
.workerChoiceStrategyOptions
572 for (const workerNodeKey
of this.workerNodes
.keys()) {
573 this.sendStatisticsMessageToWorker(workerNodeKey
)
579 public setWorkerChoiceStrategyOptions (
580 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
582 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
583 if (workerChoiceStrategyOptions
!= null) {
584 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
585 this.workerChoiceStrategiesContext
?.setOptions(
586 this.opts
.workerChoiceStrategyOptions
588 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
589 this.getWorkerWorkerChoiceStrategies(),
590 this.opts
.workerChoiceStrategyOptions
592 for (const workerNodeKey
of this.workerNodes
.keys()) {
593 this.sendStatisticsMessageToWorker(workerNodeKey
)
601 public enableTasksQueue (
603 tasksQueueOptions
?: TasksQueueOptions
605 if (this.opts
.enableTasksQueue
=== true && !enable
) {
606 this.unsetTaskStealing()
607 this.unsetTasksStealingOnBackPressure()
608 this.flushTasksQueues()
610 this.opts
.enableTasksQueue
= enable
611 this.setTasksQueueOptions(tasksQueueOptions
)
615 public setTasksQueueOptions (
616 tasksQueueOptions
: TasksQueueOptions
| undefined
618 if (this.opts
.enableTasksQueue
=== true) {
619 checkValidTasksQueueOptions(tasksQueueOptions
)
620 this.opts
.tasksQueueOptions
=
621 this.buildTasksQueueOptions(tasksQueueOptions
)
622 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
623 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
624 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
625 this.unsetTaskStealing()
626 this.setTaskStealing()
628 this.unsetTaskStealing()
630 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
631 this.unsetTasksStealingOnBackPressure()
632 this.setTasksStealingOnBackPressure()
634 this.unsetTasksStealingOnBackPressure()
636 } else if (this.opts
.tasksQueueOptions
!= null) {
637 delete this.opts
.tasksQueueOptions
641 private buildTasksQueueOptions (
642 tasksQueueOptions
: TasksQueueOptions
| undefined
643 ): TasksQueueOptions
{
645 ...getDefaultTasksQueueOptions(
646 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
652 private setTasksQueueSize (size
: number): void {
653 for (const workerNode
of this.workerNodes
) {
654 workerNode
.tasksQueueBackPressureSize
= size
658 private setTaskStealing (): void {
659 for (const workerNodeKey
of this.workerNodes
.keys()) {
660 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
664 private unsetTaskStealing (): void {
665 for (const workerNodeKey
of this.workerNodes
.keys()) {
666 this.workerNodes
[workerNodeKey
].off(
668 this.handleWorkerNodeIdleEvent
673 private setTasksStealingOnBackPressure (): void {
674 for (const workerNodeKey
of this.workerNodes
.keys()) {
675 this.workerNodes
[workerNodeKey
].on(
677 this.handleWorkerNodeBackPressureEvent
682 private unsetTasksStealingOnBackPressure (): void {
683 for (const workerNodeKey
of this.workerNodes
.keys()) {
684 this.workerNodes
[workerNodeKey
].off(
686 this.handleWorkerNodeBackPressureEvent
692 * Whether the pool is full or not.
694 * The pool filling boolean status.
696 protected get
full (): boolean {
698 this.workerNodes
.length
>=
699 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
704 * Whether the pool is busy or not.
706 * The pool busyness boolean status.
708 protected abstract get
busy (): boolean
711 * Whether worker nodes are executing concurrently their tasks quota or not.
713 * @returns Worker nodes busyness boolean status.
715 protected internalBusy (): boolean {
716 if (this.opts
.enableTasksQueue
=== true) {
718 this.workerNodes
.findIndex(
720 workerNode
.info
.ready
&&
721 workerNode
.usage
.tasks
.executing
<
722 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
723 this.opts
.tasksQueueOptions
!.concurrency
!
728 this.workerNodes
.findIndex(
730 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
735 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
736 if (this.opts
.enableTasksQueue
=== true) {
738 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
739 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
740 this.opts
.tasksQueueOptions
!.concurrency
!
743 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
746 private async sendTaskFunctionOperationToWorker (
747 workerNodeKey
: number,
748 message
: MessageValue
<Data
>
749 ): Promise
<boolean> {
750 return await new Promise
<boolean>((resolve
, reject
) => {
751 const taskFunctionOperationListener
= (
752 message
: MessageValue
<Response
>
754 this.checkMessageWorkerId(message
)
755 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
757 message
.taskFunctionOperationStatus
!= null &&
758 message
.workerId
=== workerId
760 if (message
.taskFunctionOperationStatus
) {
765 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
769 this.deregisterWorkerMessageListener(
770 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
771 taskFunctionOperationListener
775 this.registerWorkerMessageListener(
777 taskFunctionOperationListener
779 this.sendToWorker(workerNodeKey
, message
)
783 private async sendTaskFunctionOperationToWorkers (
784 message
: MessageValue
<Data
>
785 ): Promise
<boolean> {
786 return await new Promise
<boolean>((resolve
, reject
) => {
787 const responsesReceived
= new Array<MessageValue
<Response
>>()
788 const taskFunctionOperationsListener
= (
789 message
: MessageValue
<Response
>
791 this.checkMessageWorkerId(message
)
792 if (message
.taskFunctionOperationStatus
!= null) {
793 responsesReceived
.push(message
)
794 if (responsesReceived
.length
=== this.workerNodes
.length
) {
796 responsesReceived
.every(
797 message
=> message
.taskFunctionOperationStatus
=== true
802 responsesReceived
.some(
803 message
=> message
.taskFunctionOperationStatus
=== false
806 const errorResponse
= responsesReceived
.find(
807 response
=> response
.taskFunctionOperationStatus
=== false
811 `Task function operation '${
812 message.taskFunctionOperation as string
813 }' failed on worker ${errorResponse?.workerId} with error: '${
814 errorResponse?.workerError?.message
819 this.deregisterWorkerMessageListener(
820 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
821 taskFunctionOperationsListener
826 for (const workerNodeKey
of this.workerNodes
.keys()) {
827 this.registerWorkerMessageListener(
829 taskFunctionOperationsListener
831 this.sendToWorker(workerNodeKey
, message
)
837 public hasTaskFunction (name
: string): boolean {
838 return this.listTaskFunctionsProperties().some(
839 taskFunctionProperties
=> taskFunctionProperties
.name
=== name
844 public async addTaskFunction (
846 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
847 ): Promise
<boolean> {
848 if (typeof name
!== 'string') {
849 throw new TypeError('name argument must be a string')
851 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
852 throw new TypeError('name argument must not be an empty string')
854 if (typeof fn
=== 'function') {
855 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
857 if (typeof fn
.taskFunction
!== 'function') {
858 throw new TypeError('taskFunction property must be a function')
860 checkValidPriority(fn
.priority
)
861 checkValidWorkerChoiceStrategy(fn
.strategy
)
862 const opResult
= await this.sendTaskFunctionOperationToWorkers({
863 taskFunctionOperation
: 'add',
864 taskFunctionProperties
: buildTaskFunctionProperties(name
, fn
),
865 taskFunction
: fn
.taskFunction
.toString()
867 this.taskFunctions
.set(name
, fn
)
868 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
869 this.getWorkerWorkerChoiceStrategies()
875 public async removeTaskFunction (name
: string): Promise
<boolean> {
876 if (!this.taskFunctions
.has(name
)) {
878 'Cannot remove a task function not handled on the pool side'
881 const opResult
= await this.sendTaskFunctionOperationToWorkers({
882 taskFunctionOperation
: 'remove',
883 taskFunctionProperties
: buildTaskFunctionProperties(
885 this.taskFunctions
.get(name
)
888 this.deleteTaskFunctionWorkerUsages(name
)
889 this.taskFunctions
.delete(name
)
890 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
891 this.getWorkerWorkerChoiceStrategies()
897 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
898 for (const workerNode
of this.workerNodes
) {
900 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
901 workerNode
.info
.taskFunctionsProperties
.length
> 0
903 return workerNode
.info
.taskFunctionsProperties
910 * Gets task function strategy, if any.
912 * @param name - The task function name.
913 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
915 private readonly getTaskFunctionWorkerWorkerChoiceStrategy
= (
917 ): WorkerChoiceStrategy
| undefined => {
919 return this.listTaskFunctionsProperties().find(
920 (taskFunctionProperties
: TaskFunctionProperties
) =>
921 taskFunctionProperties
.name
=== name
927 * Gets worker node task function priority, if any.
929 * @param workerNodeKey - The worker node key.
930 * @param name - The task function name.
931 * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
933 private readonly getWorkerNodeTaskFunctionPriority
= (
934 workerNodeKey
: number,
936 ): number | undefined => {
938 return this.getWorkerInfo(workerNodeKey
)?.taskFunctionsProperties
?.find(
939 (taskFunctionProperties
: TaskFunctionProperties
) =>
940 taskFunctionProperties
.name
=== name
946 * Gets the worker choice strategies registered in this pool.
948 * @returns The worker choice strategies.
950 private readonly getWorkerWorkerChoiceStrategies
=
951 (): Set
<WorkerChoiceStrategy
> => {
953 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
954 this.opts
.workerChoiceStrategy
!,
955 ...(this.listTaskFunctionsProperties()
957 (taskFunctionProperties
: TaskFunctionProperties
) =>
958 taskFunctionProperties
.strategy
961 (strategy
: WorkerChoiceStrategy
| undefined) => strategy
!= null
962 ) as WorkerChoiceStrategy
[])
967 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
968 return await this.sendTaskFunctionOperationToWorkers({
969 taskFunctionOperation
: 'default',
970 taskFunctionProperties
: buildTaskFunctionProperties(
972 this.taskFunctions
.get(name
)
977 private deleteTaskFunctionWorkerUsages (name
: string): void {
978 for (const workerNode
of this.workerNodes
) {
979 workerNode
.deleteTaskFunctionWorkerUsage(name
)
983 private shallExecuteTask (workerNodeKey
: number): boolean {
985 this.tasksQueueSize(workerNodeKey
) === 0 &&
986 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
987 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
988 this.opts
.tasksQueueOptions
!.concurrency
!
993 public async execute (
996 transferList
?: readonly TransferListItem
[]
997 ): Promise
<Response
> {
998 return await new Promise
<Response
>((resolve
, reject
) => {
1000 reject(new Error('Cannot execute a task on not started pool'))
1003 if (this.destroying
) {
1004 reject(new Error('Cannot execute a task on destroying pool'))
1007 if (name
!= null && typeof name
!== 'string') {
1008 reject(new TypeError('name argument must be a string'))
1013 typeof name
=== 'string' &&
1014 name
.trim().length
=== 0
1016 reject(new TypeError('name argument must not be an empty string'))
1019 if (transferList
!= null && !Array.isArray(transferList
)) {
1020 reject(new TypeError('transferList argument must be an array'))
1023 const timestamp
= performance
.now()
1024 const taskFunctionStrategy
=
1025 this.getTaskFunctionWorkerWorkerChoiceStrategy(name
)
1026 const workerNodeKey
= this.chooseWorkerNode(taskFunctionStrategy
)
1027 const task
: Task
<Data
> = {
1028 name
: name
?? DEFAULT_TASK_NAME
,
1029 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
1030 data
: data
?? ({} as Data
),
1031 priority
: this.getWorkerNodeTaskFunctionPriority(workerNodeKey
, name
),
1032 strategy
: taskFunctionStrategy
,
1035 taskId
: randomUUID()
1037 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1038 this.promiseResponseMap
.set(task
.taskId
!, {
1042 ...(this.emitter
!= null && {
1043 asyncResource
: new AsyncResource('poolifier:task', {
1044 triggerAsyncId
: this.emitter
.asyncId
,
1045 requireManualDestroy
: true
1050 this.opts
.enableTasksQueue
=== false ||
1051 (this.opts
.enableTasksQueue
=== true &&
1052 this.shallExecuteTask(workerNodeKey
))
1054 this.executeTask(workerNodeKey
, task
)
1056 this.enqueueTask(workerNodeKey
, task
)
1062 * Starts the minimum number of workers.
1064 private startMinimumNumberOfWorkers (): void {
1065 this.startingMinimumNumberOfWorkers
= true
1067 this.workerNodes
.reduce(
1068 (accumulator
, workerNode
) =>
1069 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
1071 ) < this.minimumNumberOfWorkers
1073 this.createAndSetupWorkerNode()
1075 this.startingMinimumNumberOfWorkers
= false
1079 public start (): void {
1081 throw new Error('Cannot start an already started pool')
1083 if (this.starting
) {
1084 throw new Error('Cannot start an already starting pool')
1086 if (this.destroying
) {
1087 throw new Error('Cannot start a destroying pool')
1089 this.starting
= true
1090 this.startMinimumNumberOfWorkers()
1091 this.starting
= false
1093 this.startTimestamp
= performance
.now()
1097 public async destroy (): Promise
<void> {
1098 if (!this.started
) {
1099 throw new Error('Cannot destroy an already destroyed pool')
1101 if (this.starting
) {
1102 throw new Error('Cannot destroy an starting pool')
1104 if (this.destroying
) {
1105 throw new Error('Cannot destroy an already destroying pool')
1107 this.destroying
= true
1109 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1110 await this.destroyWorkerNode(workerNodeKey
)
1113 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1114 this.emitter
?.emitDestroy()
1115 this.readyEventEmitted
= false
1116 this.destroying
= false
1117 this.started
= false
1118 delete this.startTimestamp
1121 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1122 await new Promise
<void>((resolve
, reject
) => {
1123 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1124 if (this.workerNodes
[workerNodeKey
] == null) {
1128 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1129 this.checkMessageWorkerId(message
)
1130 if (message
.kill
=== 'success') {
1132 } else if (message
.kill
=== 'failure') {
1135 `Kill message handling failed on worker ${message.workerId}`
1140 // FIXME: should be registered only once
1141 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1142 this.sendToWorker(workerNodeKey
, { kill
: true })
1147 * Terminates the worker node given its worker node key.
1149 * @param workerNodeKey - The worker node key.
1151 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1152 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1153 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1154 const workerNode
= this.workerNodes
[workerNodeKey
]
1155 await waitWorkerNodeEvents(
1159 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1160 getDefaultTasksQueueOptions(
1161 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1162 ).tasksFinishedTimeout
1164 await this.sendKillMessageToWorker(workerNodeKey
)
1165 await workerNode
.terminate()
1169 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1170 * Can be overridden.
1174 protected setupHook (): void {
1175 /* Intentionally empty */
1179 * Returns whether the worker is the main worker or not.
1181 * @returns `true` if the worker is the main worker, `false` otherwise.
1183 protected abstract isMain (): boolean
1186 * Hook executed before the worker task execution.
1187 * Can be overridden.
1189 * @param workerNodeKey - The worker node key.
1190 * @param task - The task to execute.
1192 protected beforeTaskExecutionHook (
1193 workerNodeKey
: number,
1196 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1197 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1198 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1199 ++workerUsage
.tasks
.executing
1200 updateWaitTimeWorkerUsage(
1201 this.workerChoiceStrategiesContext
,
1207 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1208 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1209 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1213 const taskFunctionWorkerUsage
= this.workerNodes
[
1215 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1216 ].getTaskFunctionWorkerUsage(task
.name
!)!
1217 ++taskFunctionWorkerUsage
.tasks
.executing
1218 updateWaitTimeWorkerUsage(
1219 this.workerChoiceStrategiesContext
,
1220 taskFunctionWorkerUsage
,
1227 * Hook executed after the worker task execution.
1228 * Can be overridden.
1230 * @param workerNodeKey - The worker node key.
1231 * @param message - The received message.
1233 protected afterTaskExecutionHook (
1234 workerNodeKey
: number,
1235 message
: MessageValue
<Response
>
1237 let needWorkerChoiceStrategyUpdate
= false
1238 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1239 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1240 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1241 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1242 updateRunTimeWorkerUsage(
1243 this.workerChoiceStrategiesContext
,
1247 updateEluWorkerUsage(
1248 this.workerChoiceStrategiesContext
,
1252 needWorkerChoiceStrategyUpdate
= true
1255 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1256 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1257 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1258 message
.taskPerformance
!.name
1261 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1262 const taskFunctionWorkerUsage
= this.workerNodes
[
1264 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1265 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1266 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1267 updateRunTimeWorkerUsage(
1268 this.workerChoiceStrategiesContext
,
1269 taskFunctionWorkerUsage
,
1272 updateEluWorkerUsage(
1273 this.workerChoiceStrategiesContext
,
1274 taskFunctionWorkerUsage
,
1277 needWorkerChoiceStrategyUpdate
= true
1279 if (needWorkerChoiceStrategyUpdate
) {
1280 this.workerChoiceStrategiesContext
?.update(workerNodeKey
)
1285 * Whether the worker node shall update its task function worker usage or not.
1287 * @param workerNodeKey - The worker node key.
1288 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1290 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1291 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1293 workerInfo
!= null &&
1294 Array.isArray(workerInfo
.taskFunctionsProperties
) &&
1295 workerInfo
.taskFunctionsProperties
.length
> 2
1300 * Chooses a worker node for the next task given the worker choice strategy.
1302 * @param workerChoiceStrategy - The worker choice strategy.
1303 * @returns The chosen worker node key
1305 private chooseWorkerNode (
1306 workerChoiceStrategy
?: WorkerChoiceStrategy
1308 if (this.shallCreateDynamicWorker()) {
1309 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1311 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1314 return workerNodeKey
1317 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1318 return this.workerChoiceStrategiesContext
!.execute(workerChoiceStrategy
)
1322 * Conditions for dynamic worker creation.
1324 * @returns Whether to create a dynamic worker or not.
1326 protected abstract shallCreateDynamicWorker (): boolean
1329 * Sends a message to worker given its worker node key.
1331 * @param workerNodeKey - The worker node key.
1332 * @param message - The message.
1333 * @param transferList - The optional array of transferable objects.
1335 protected abstract sendToWorker (
1336 workerNodeKey
: number,
1337 message
: MessageValue
<Data
>,
1338 transferList
?: readonly TransferListItem
[]
1342 * Creates a new, completely set up worker node.
1344 * @returns New, completely set up worker node key.
1346 protected createAndSetupWorkerNode (): number {
1347 const workerNode
= this.createWorkerNode()
1348 workerNode
.registerWorkerEventHandler(
1350 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1352 workerNode
.registerWorkerEventHandler(
1354 this.opts
.messageHandler
?? EMPTY_FUNCTION
1356 workerNode
.registerWorkerEventHandler(
1358 this.opts
.errorHandler
?? EMPTY_FUNCTION
1360 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1361 workerNode
.info
.ready
= false
1362 this.emitter
?.emit(PoolEvents
.error
, error
)
1366 this.opts
.restartWorkerOnError
=== true
1368 if (workerNode
.info
.dynamic
) {
1369 this.createAndSetupDynamicWorkerNode()
1370 } else if (!this.startingMinimumNumberOfWorkers
) {
1371 this.startMinimumNumberOfWorkers()
1377 this.opts
.enableTasksQueue
=== true
1379 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1381 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1382 workerNode
?.terminate().catch((error
: unknown
) => {
1383 this.emitter
?.emit(PoolEvents
.error
, error
)
1386 workerNode
.registerWorkerEventHandler(
1388 this.opts
.exitHandler
?? EMPTY_FUNCTION
1390 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1391 this.removeWorkerNode(workerNode
)
1394 !this.startingMinimumNumberOfWorkers
&&
1397 this.startMinimumNumberOfWorkers()
1400 const workerNodeKey
= this.addWorkerNode(workerNode
)
1401 this.afterWorkerNodeSetup(workerNodeKey
)
1402 return workerNodeKey
1406 * Creates a new, completely set up dynamic worker node.
1408 * @returns New, completely set up dynamic worker node key.
1410 protected createAndSetupDynamicWorkerNode (): number {
1411 const workerNodeKey
= this.createAndSetupWorkerNode()
1412 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1413 this.checkMessageWorkerId(message
)
1414 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1417 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1418 // Kill message received from worker
1420 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1421 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1422 ((this.opts
.enableTasksQueue
=== false &&
1423 workerUsage
.tasks
.executing
=== 0) ||
1424 (this.opts
.enableTasksQueue
=== true &&
1425 workerUsage
.tasks
.executing
=== 0 &&
1426 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1428 // Flag the worker node as not ready immediately
1429 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1430 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1431 this.emitter
?.emit(PoolEvents
.error
, error
)
1435 this.sendToWorker(workerNodeKey
, {
1438 if (this.taskFunctions
.size
> 0) {
1439 for (const [taskFunctionName
, taskFunctionObject
] of this.taskFunctions
) {
1440 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1441 taskFunctionOperation
: 'add',
1442 taskFunctionProperties
: buildTaskFunctionProperties(
1446 taskFunction
: taskFunctionObject
.taskFunction
.toString()
1447 }).catch((error
: unknown
) => {
1448 this.emitter
?.emit(PoolEvents
.error
, error
)
1452 const workerNode
= this.workerNodes
[workerNodeKey
]
1453 workerNode
.info
.dynamic
= true
1455 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerReady
===
1457 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1460 workerNode
.info
.ready
= true
1462 this.checkAndEmitDynamicWorkerCreationEvents()
1463 return workerNodeKey
1467 * Registers a listener callback on the worker given its worker node key.
1469 * @param workerNodeKey - The worker node key.
1470 * @param listener - The message listener callback.
1472 protected abstract registerWorkerMessageListener
<
1473 Message
extends Data
| Response
1475 workerNodeKey
: number,
1476 listener
: (message
: MessageValue
<Message
>) => void
1480 * Registers once a listener callback on the worker given its worker node key.
1482 * @param workerNodeKey - The worker node key.
1483 * @param listener - The message listener callback.
1485 protected abstract registerOnceWorkerMessageListener
<
1486 Message
extends Data
| Response
1488 workerNodeKey
: number,
1489 listener
: (message
: MessageValue
<Message
>) => void
1493 * Deregisters a listener callback on the worker given its worker node key.
1495 * @param workerNodeKey - The worker node key.
1496 * @param listener - The message listener callback.
1498 protected abstract deregisterWorkerMessageListener
<
1499 Message
extends Data
| Response
1501 workerNodeKey
: number,
1502 listener
: (message
: MessageValue
<Message
>) => void
1506 * Method hooked up after a worker node has been newly created.
1507 * Can be overridden.
1509 * @param workerNodeKey - The newly created worker node key.
1511 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1512 // Listen to worker messages.
1513 this.registerWorkerMessageListener(
1515 this.workerMessageListener
1517 // Send the startup message to worker.
1518 this.sendStartupMessageToWorker(workerNodeKey
)
1519 // Send the statistics message to worker.
1520 this.sendStatisticsMessageToWorker(workerNodeKey
)
1521 if (this.opts
.enableTasksQueue
=== true) {
1522 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1523 this.workerNodes
[workerNodeKey
].on(
1525 this.handleWorkerNodeIdleEvent
1528 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1529 this.workerNodes
[workerNodeKey
].on(
1531 this.handleWorkerNodeBackPressureEvent
1538 * Sends the startup message to worker given its worker node key.
1540 * @param workerNodeKey - The worker node key.
1542 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1545 * Sends the statistics message to worker given its worker node key.
1547 * @param workerNodeKey - The worker node key.
1549 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1550 this.sendToWorker(workerNodeKey
, {
1553 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1554 .runTime
.aggregate
?? false,
1556 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1557 .elu
.aggregate
?? false
1562 private cannotStealTask (): boolean {
1563 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1566 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1567 if (this.shallExecuteTask(workerNodeKey
)) {
1568 this.executeTask(workerNodeKey
, task
)
1570 this.enqueueTask(workerNodeKey
, task
)
1574 private redistributeQueuedTasks (workerNodeKey
: number): void {
1575 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1578 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1579 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1580 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1581 return workerNode
.info
.ready
&&
1582 workerNode
.usage
.tasks
.queued
<
1583 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1590 destinationWorkerNodeKey
,
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 this.dequeueTask(workerNodeKey
)!
1597 private updateTaskStolenStatisticsWorkerUsage (
1598 workerNodeKey
: number,
1601 const workerNode
= this.workerNodes
[workerNodeKey
]
1602 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1603 if (workerNode
?.usage
!= null) {
1604 ++workerNode
.usage
.tasks
.stolen
1607 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1608 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1610 const taskFunctionWorkerUsage
=
1611 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1612 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1613 ++taskFunctionWorkerUsage
.tasks
.stolen
1617 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1618 workerNodeKey
: number
1620 const workerNode
= this.workerNodes
[workerNodeKey
]
1621 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1622 if (workerNode
?.usage
!= null) {
1623 ++workerNode
.usage
.tasks
.sequentiallyStolen
1627 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1628 workerNodeKey
: number,
1631 const workerNode
= this.workerNodes
[workerNodeKey
]
1633 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1634 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1636 const taskFunctionWorkerUsage
=
1637 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1638 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1639 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1643 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1644 workerNodeKey
: number
1646 const workerNode
= this.workerNodes
[workerNodeKey
]
1647 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1648 if (workerNode
?.usage
!= null) {
1649 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1653 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1654 workerNodeKey
: number,
1657 const workerNode
= this.workerNodes
[workerNodeKey
]
1659 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1660 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1662 const taskFunctionWorkerUsage
=
1663 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1664 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1665 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1669 private readonly handleWorkerNodeIdleEvent
= (
1670 eventDetail
: WorkerNodeEventDetail
,
1671 previousStolenTask
?: Task
<Data
>
1673 const { workerNodeKey
} = eventDetail
1674 if (workerNodeKey
== null) {
1676 "WorkerNode event detail 'workerNodeKey' property must be defined"
1679 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1681 this.cannotStealTask() ||
1682 (this.info
.stealingWorkerNodes
?? 0) >
1683 Math.floor(this.workerNodes
.length
/ 2)
1685 if (workerInfo
!= null && previousStolenTask
!= null) {
1686 workerInfo
.stealing
= false
1690 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1692 workerInfo
!= null &&
1693 previousStolenTask
!= null &&
1694 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1695 (workerNodeTasksUsage
.executing
> 0 ||
1696 this.tasksQueueSize(workerNodeKey
) > 0)
1698 workerInfo
.stealing
= false
1699 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1700 for (const taskFunctionProperties
of workerInfo
.taskFunctionsProperties
!) {
1701 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1703 taskFunctionProperties
.name
1706 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1709 if (workerInfo
== null) {
1711 `Worker node with key '${workerNodeKey}' not found in pool`
1714 workerInfo
.stealing
= true
1715 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1717 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1720 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1721 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1723 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1724 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1726 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1727 (previousStolenTask
!= null &&
1728 previousStolenTask
.name
=== stolenTask
.name
&&
1729 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1731 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1733 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1737 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1739 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1744 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1746 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1749 .catch((error
: unknown
) => {
1750 this.emitter
?.emit(PoolEvents
.error
, error
)
1754 private readonly workerNodeStealTask
= (
1755 workerNodeKey
: number
1756 ): Task
<Data
> | undefined => {
1757 const workerNodes
= this.workerNodes
1760 (workerNodeA
, workerNodeB
) =>
1761 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1763 const sourceWorkerNode
= workerNodes
.find(
1764 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1765 sourceWorkerNode
.info
.ready
&&
1766 !sourceWorkerNode
.info
.stealing
&&
1767 sourceWorkerNodeKey
!== workerNodeKey
&&
1768 sourceWorkerNode
.usage
.tasks
.queued
> 0
1770 if (sourceWorkerNode
!= null) {
1771 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1772 const task
= sourceWorkerNode
.dequeueTask(1)!
1773 this.handleTask(workerNodeKey
, task
)
1774 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1775 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1776 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1781 private readonly handleWorkerNodeBackPressureEvent
= (
1782 eventDetail
: WorkerNodeEventDetail
1785 this.cannotStealTask() ||
1786 (this.info
.stealingWorkerNodes
?? 0) >
1787 Math.floor(this.workerNodes
.length
/ 2)
1791 const { workerId
} = eventDetail
1792 const sizeOffset
= 1
1793 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1794 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1797 const sourceWorkerNode
=
1798 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1799 const workerNodes
= this.workerNodes
1802 (workerNodeA
, workerNodeB
) =>
1803 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1805 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1807 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1808 workerNode
.info
.ready
&&
1809 !workerNode
.info
.stealing
&&
1810 workerNode
.info
.id
!== workerId
&&
1811 workerNode
.usage
.tasks
.queued
<
1812 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1813 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1815 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1816 if (workerInfo
== null) {
1818 `Worker node with key '${workerNodeKey}' not found in pool`
1821 workerInfo
.stealing
= true
1822 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1823 const task
= sourceWorkerNode
.dequeueTask(1)!
1824 this.handleTask(workerNodeKey
, task
)
1825 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1826 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1827 workerInfo
.stealing
= false
1833 * This method is the message listener registered on each worker.
1835 protected readonly workerMessageListener
= (
1836 message
: MessageValue
<Response
>
1838 this.checkMessageWorkerId(message
)
1839 const { workerId
, ready
, taskId
, taskFunctionsProperties
} = message
1840 if (ready
!= null && taskFunctionsProperties
!= null) {
1841 // Worker ready response received from worker
1842 this.handleWorkerReadyResponse(message
)
1843 } else if (taskFunctionsProperties
!= null) {
1844 // Task function properties message received from worker
1845 const workerInfo
= this.getWorkerInfo(
1846 this.getWorkerNodeKeyByWorkerId(workerId
)
1848 if (workerInfo
!= null) {
1849 workerInfo
.taskFunctionsProperties
= taskFunctionsProperties
1851 } else if (taskId
!= null) {
1852 // Task execution response received from worker
1853 this.handleTaskExecutionResponse(message
)
1857 private checkAndEmitReadyEvent (): void {
1858 if (!this.readyEventEmitted
&& this.ready
) {
1859 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1860 this.readyEventEmitted
= true
1864 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1865 const { workerId
, ready
, taskFunctionsProperties
} = message
1866 if (ready
== null || !ready
) {
1867 throw new Error(`Worker ${workerId} failed to initialize`)
1870 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1871 workerNode
.info
.ready
= ready
1872 workerNode
.info
.taskFunctionsProperties
= taskFunctionsProperties
1873 this.checkAndEmitReadyEvent()
1876 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1877 const { workerId
, taskId
, workerError
, data
} = message
1878 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1879 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1880 if (promiseResponse
!= null) {
1881 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1882 const workerNode
= this.workerNodes
[workerNodeKey
]
1883 if (workerError
!= null) {
1884 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1885 asyncResource
!= null
1886 ? asyncResource
.runInAsyncScope(
1891 : reject(workerError
.message
)
1893 asyncResource
!= null
1894 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1895 : resolve(data
as Response
)
1897 asyncResource
?.emitDestroy()
1898 this.afterTaskExecutionHook(workerNodeKey
, message
)
1899 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1900 this.promiseResponseMap
.delete(taskId
!)
1901 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1902 workerNode
?.emit('taskFinished', taskId
)
1904 this.opts
.enableTasksQueue
=== true &&
1906 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1909 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1911 this.tasksQueueSize(workerNodeKey
) > 0 &&
1912 workerNodeTasksUsage
.executing
<
1913 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1914 this.opts
.tasksQueueOptions
!.concurrency
!
1916 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1917 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1920 workerNodeTasksUsage
.executing
=== 0 &&
1921 this.tasksQueueSize(workerNodeKey
) === 0 &&
1922 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1924 workerNode
.emit('idle', {
1933 private checkAndEmitTaskExecutionEvents (): void {
1935 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1939 private checkAndEmitTaskQueuingEvents (): void {
1940 if (this.hasBackPressure()) {
1941 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1946 * Emits dynamic worker creation events.
1948 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1951 * Gets the worker information given its worker node key.
1953 * @param workerNodeKey - The worker node key.
1954 * @returns The worker information.
1956 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1957 return this.workerNodes
[workerNodeKey
]?.info
1961 * Creates a worker node.
1963 * @returns The created worker node.
1965 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1966 const workerNode
= new WorkerNode
<Worker
, Data
>(
1971 workerOptions
: this.opts
.workerOptions
,
1972 tasksQueueBackPressureSize
:
1973 this.opts
.tasksQueueOptions
?.size
??
1974 getDefaultTasksQueueOptions(
1975 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1977 tasksQueueBucketSize
:
1978 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
) * 2
1981 // Flag the worker node as ready at pool startup.
1982 if (this.starting
) {
1983 workerNode
.info
.ready
= true
1989 * Adds the given worker node in the pool worker nodes.
1991 * @param workerNode - The worker node.
1992 * @returns The added worker node key.
1993 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1995 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1996 this.workerNodes
.push(workerNode
)
1997 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1998 if (workerNodeKey
=== -1) {
1999 throw new Error('Worker added not found in worker nodes')
2001 return workerNodeKey
2004 private checkAndEmitEmptyEvent (): void {
2006 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
2007 this.readyEventEmitted
= false
2012 * Removes the worker node from the pool worker nodes.
2014 * @param workerNode - The worker node.
2016 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
2017 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
2018 if (workerNodeKey
!== -1) {
2019 this.workerNodes
.splice(workerNodeKey
, 1)
2020 this.workerChoiceStrategiesContext
?.remove(workerNodeKey
)
2022 this.checkAndEmitEmptyEvent()
2025 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
2026 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
2027 if (workerInfo
!= null) {
2028 workerInfo
.ready
= false
2032 private hasBackPressure (): boolean {
2034 this.opts
.enableTasksQueue
=== true &&
2035 this.workerNodes
.findIndex(
2036 workerNode
=> !workerNode
.hasBackPressure()
2042 * Executes the given task on the worker given its worker node key.
2044 * @param workerNodeKey - The worker node key.
2045 * @param task - The task to execute.
2047 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
2048 this.beforeTaskExecutionHook(workerNodeKey
, task
)
2049 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
2050 this.checkAndEmitTaskExecutionEvents()
2053 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
2054 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
2055 this.checkAndEmitTaskQueuingEvents()
2056 return tasksQueueSize
2059 private dequeueTask (
2060 workerNodeKey
: number,
2062 ): Task
<Data
> | undefined {
2063 return this.workerNodes
[workerNodeKey
].dequeueTask(bucket
)
2066 private tasksQueueSize (workerNodeKey
: number): number {
2067 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
2070 protected flushTasksQueue (workerNodeKey
: number): number {
2071 let flushedTasks
= 0
2072 while (this.tasksQueueSize(workerNodeKey
) > 0) {
2073 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2074 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
2077 this.workerNodes
[workerNodeKey
].clearTasksQueue()
2081 private flushTasksQueues (): void {
2082 for (const workerNodeKey
of this.workerNodes
.keys()) {
2083 this.flushTasksQueue(workerNodeKey
)