1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 TaskFunctionProperties
12 } from
'../utility-types.js'
15 buildTaskFunctionProperties
,
30 } from
'../worker/task-functions.js'
31 import { KillBehaviors
} from
'../worker/worker-options.js'
39 type TasksQueueOptions
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types.js'
47 import { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
50 checkValidTasksQueueOptions
,
51 checkValidWorkerChoiceStrategy
,
52 getDefaultTasksQueueOptions
,
54 updateRunTimeWorkerUsage
,
55 updateTaskStatisticsWorkerUsage
,
56 updateWaitTimeWorkerUsage
,
59 import { version
} from
'./version.js'
64 WorkerNodeEventDetail
,
67 import { WorkerNode
} from
'./worker-node.js'
70 * Base class that implements some shared logic for all poolifier pools.
72 * @typeParam Worker - Type of worker which manages this pool.
73 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
74 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
76 export abstract class AbstractPool
<
77 Worker
extends IWorker
,
80 > implements IPool
<Worker
, Data
, Response
> {
82 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
85 public emitter
?: EventEmitterAsyncResource
88 * The task execution response promise map:
89 * - `key`: The message id of each submitted task.
90 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
92 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
94 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
95 new Map
<string, PromiseResponseWrapper
<Response
>>()
98 * Worker choice strategies context referencing worker choice algorithms implementation.
100 protected workerChoiceStrategiesContext
?: WorkerChoiceStrategiesContext
<
107 * The task functions added at runtime map:
108 * - `key`: The task function name.
109 * - `value`: The task function object.
111 private readonly taskFunctions
: Map
<
113 TaskFunctionObject
<Data
, Response
>
117 * Whether the pool is started or not.
119 private started
: boolean
121 * Whether the pool is starting or not.
123 private starting
: boolean
125 * Whether the pool is destroying or not.
127 private destroying
: boolean
129 * Whether the minimum number of workers is starting or not.
131 private startingMinimumNumberOfWorkers
: boolean
133 * Whether the pool ready event has been emitted or not.
135 private readyEventEmitted
: boolean
137 * The start timestamp of the pool.
139 private readonly startTimestamp
142 * Constructs a new poolifier pool.
144 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
145 * @param filePath - Path to the worker file.
146 * @param opts - Options for the pool.
147 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
150 protected readonly minimumNumberOfWorkers
: number,
151 protected readonly filePath
: string,
152 protected readonly opts
: PoolOptions
<Worker
>,
153 protected readonly maximumNumberOfWorkers
?: number
155 if (!this.isMain()) {
157 'Cannot start a pool from a worker with the same type as the pool'
161 checkFilePath(this.filePath
)
162 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
163 this.checkPoolOptions(this.opts
)
165 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
166 this.executeTask
= this.executeTask
.bind(this)
167 this.enqueueTask
= this.enqueueTask
.bind(this)
169 if (this.opts
.enableEvents
=== true) {
170 this.initializeEventEmitter()
172 this.workerChoiceStrategiesContext
= new WorkerChoiceStrategiesContext
<
178 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
179 [this.opts
.workerChoiceStrategy
!],
180 this.opts
.workerChoiceStrategyOptions
185 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
188 this.starting
= false
189 this.destroying
= false
190 this.readyEventEmitted
= false
191 this.startingMinimumNumberOfWorkers
= false
192 if (this.opts
.startWorkers
=== true) {
196 this.startTimestamp
= performance
.now()
199 private checkPoolType (): void {
200 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
202 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
207 private checkMinimumNumberOfWorkers (
208 minimumNumberOfWorkers
: number | undefined
210 if (minimumNumberOfWorkers
== null) {
212 'Cannot instantiate a pool without specifying the number of workers'
214 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
216 'Cannot instantiate a pool with a non safe integer number of workers'
218 } else if (minimumNumberOfWorkers
< 0) {
219 throw new RangeError(
220 'Cannot instantiate a pool with a negative number of workers'
222 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
223 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
227 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
228 if (isPlainObject(opts
)) {
229 this.opts
.startWorkers
= opts
.startWorkers
?? true
230 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
231 this.opts
.workerChoiceStrategy
=
232 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
233 this.checkValidWorkerChoiceStrategyOptions(
234 opts
.workerChoiceStrategyOptions
236 if (opts
.workerChoiceStrategyOptions
!= null) {
237 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
239 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
240 this.opts
.enableEvents
= opts
.enableEvents
?? true
241 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
242 if (this.opts
.enableTasksQueue
) {
243 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
244 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
245 opts
.tasksQueueOptions
249 throw new TypeError('Invalid pool options: must be a plain object')
253 private checkValidWorkerChoiceStrategyOptions (
254 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
257 workerChoiceStrategyOptions
!= null &&
258 !isPlainObject(workerChoiceStrategyOptions
)
261 'Invalid worker choice strategy options: must be a plain object'
265 workerChoiceStrategyOptions
?.weights
!= null &&
266 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
267 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
270 'Invalid worker choice strategy options: must have a weight for each worker node'
274 workerChoiceStrategyOptions
?.measurement
!= null &&
275 !Object.values(Measurements
).includes(
276 workerChoiceStrategyOptions
.measurement
280 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
285 private initializeEventEmitter (): void {
286 this.emitter
= new EventEmitterAsyncResource({
287 name
: `poolifier:${this.type}-${this.worker}-pool`
292 public get
info (): PoolInfo
{
297 started
: this.started
,
299 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
300 defaultStrategy
: this.opts
.workerChoiceStrategy
!,
301 strategyRetries
: this.workerChoiceStrategiesContext
?.retriesCount
?? 0,
302 minSize
: this.minimumNumberOfWorkers
,
303 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
304 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
305 .runTime
.aggregate
=== true &&
306 this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
307 .waitTime
.aggregate
&& {
308 utilization
: round(this.utilization
)
310 workerNodes
: this.workerNodes
.length
,
311 idleWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, workerNode
) =>
313 workerNode
.usage
.tasks
.executing
=== 0
318 ...(this.opts
.enableTasksQueue
=== true && {
319 stealingWorkerNodes
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
325 busyWorkerNodes
: this.workerNodes
.reduce(
326 (accumulator
, _workerNode
, workerNodeKey
) =>
327 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
330 executedTasks
: this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ workerNode
.usage
.tasks
.executed
,
335 executingTasks
: this.workerNodes
.reduce(
336 (accumulator
, workerNode
) =>
337 accumulator
+ workerNode
.usage
.tasks
.executing
,
340 ...(this.opts
.enableTasksQueue
=== true && {
341 queuedTasks
: this.workerNodes
.reduce(
342 (accumulator
, workerNode
) =>
343 accumulator
+ workerNode
.usage
.tasks
.queued
,
347 ...(this.opts
.enableTasksQueue
=== true && {
348 maxQueuedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
354 ...(this.opts
.enableTasksQueue
=== true && {
355 backPressure
: this.hasBackPressure()
357 ...(this.opts
.enableTasksQueue
=== true && {
358 stolenTasks
: this.workerNodes
.reduce(
359 (accumulator
, workerNode
) =>
360 accumulator
+ workerNode
.usage
.tasks
.stolen
,
364 failedTasks
: this.workerNodes
.reduce(
365 (accumulator
, workerNode
) =>
366 accumulator
+ workerNode
.usage
.tasks
.failed
,
369 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
370 .runTime
.aggregate
=== true && {
374 ...this.workerNodes
.map(
375 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
381 ...this.workerNodes
.map(
382 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
386 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
387 .runTime
.average
&& {
390 this.workerNodes
.reduce
<number[]>(
391 (accumulator
, workerNode
) =>
392 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
402 this.workerNodes
.reduce
<number[]>(
403 (accumulator
, workerNode
) =>
404 accumulator
.concat(workerNode
.usage
.runTime
.history
),
412 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
413 .waitTime
.aggregate
=== true && {
417 ...this.workerNodes
.map(
418 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
424 ...this.workerNodes
.map(
425 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
429 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
430 .waitTime
.average
&& {
433 this.workerNodes
.reduce
<number[]>(
434 (accumulator
, workerNode
) =>
435 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
441 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
442 .waitTime
.median
&& {
445 this.workerNodes
.reduce
<number[]>(
446 (accumulator
, workerNode
) =>
447 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
459 * The pool readiness boolean status.
461 private get
ready (): boolean {
466 this.workerNodes
.reduce(
467 (accumulator
, workerNode
) =>
468 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
472 ) >= this.minimumNumberOfWorkers
477 * The pool emptiness boolean status.
479 protected get
empty (): boolean {
480 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
484 * The approximate pool utilization.
486 * @returns The pool utilization.
488 private get
utilization (): number {
489 const poolTimeCapacity
=
490 (performance
.now() - this.startTimestamp
) *
491 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
492 const totalTasksRunTime
= this.workerNodes
.reduce(
493 (accumulator
, workerNode
) =>
494 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
497 const totalTasksWaitTime
= this.workerNodes
.reduce(
498 (accumulator
, workerNode
) =>
499 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
502 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
508 * If it is `'dynamic'`, it provides the `max` property.
510 protected abstract get
type (): PoolType
515 protected abstract get
worker (): WorkerType
518 * Checks if the worker id sent in the received message from a worker is valid.
520 * @param message - The received message.
521 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
523 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
524 if (message
.workerId
== null) {
525 throw new Error('Worker message received without worker id')
526 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
528 `Worker message received from unknown worker '${message.workerId}'`
534 * Gets the worker node key given its worker id.
536 * @param workerId - The worker id.
537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
539 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
540 return this.workerNodes
.findIndex(
541 workerNode
=> workerNode
.info
.id
=== workerId
546 public setWorkerChoiceStrategy (
547 workerChoiceStrategy
: WorkerChoiceStrategy
,
548 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
550 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
551 if (workerChoiceStrategyOptions
!= null) {
552 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
554 if (workerChoiceStrategy
!== this.opts
.workerChoiceStrategy
) {
555 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
556 this.workerChoiceStrategiesContext
?.setDefaultWorkerChoiceStrategy(
557 this.opts
.workerChoiceStrategy
,
558 this.opts
.workerChoiceStrategyOptions
560 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
561 this.getWorkerWorkerChoiceStrategies(),
562 this.opts
.workerChoiceStrategyOptions
564 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
565 this.sendStatisticsMessageToWorker(workerNodeKey
)
571 public setWorkerChoiceStrategyOptions (
572 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
574 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
575 if (workerChoiceStrategyOptions
!= null) {
576 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
578 this.workerChoiceStrategiesContext
?.setOptions(
579 this.opts
.workerChoiceStrategyOptions
584 public enableTasksQueue (
586 tasksQueueOptions
?: TasksQueueOptions
588 if (this.opts
.enableTasksQueue
=== true && !enable
) {
589 this.unsetTaskStealing()
590 this.unsetTasksStealingOnBackPressure()
591 this.flushTasksQueues()
593 this.opts
.enableTasksQueue
= enable
594 this.setTasksQueueOptions(tasksQueueOptions
)
598 public setTasksQueueOptions (
599 tasksQueueOptions
: TasksQueueOptions
| undefined
601 if (this.opts
.enableTasksQueue
=== true) {
602 checkValidTasksQueueOptions(tasksQueueOptions
)
603 this.opts
.tasksQueueOptions
=
604 this.buildTasksQueueOptions(tasksQueueOptions
)
605 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
606 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
607 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
608 this.unsetTaskStealing()
609 this.setTaskStealing()
611 this.unsetTaskStealing()
613 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
614 this.unsetTasksStealingOnBackPressure()
615 this.setTasksStealingOnBackPressure()
617 this.unsetTasksStealingOnBackPressure()
619 } else if (this.opts
.tasksQueueOptions
!= null) {
620 delete this.opts
.tasksQueueOptions
624 private buildTasksQueueOptions (
625 tasksQueueOptions
: TasksQueueOptions
| undefined
626 ): TasksQueueOptions
{
628 ...getDefaultTasksQueueOptions(
629 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
635 private setTasksQueueSize (size
: number): void {
636 for (const workerNode
of this.workerNodes
) {
637 workerNode
.tasksQueueBackPressureSize
= size
641 private setTaskStealing (): void {
642 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
643 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
647 private unsetTaskStealing (): void {
648 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
649 this.workerNodes
[workerNodeKey
].off(
651 this.handleWorkerNodeIdleEvent
656 private setTasksStealingOnBackPressure (): void {
657 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
658 this.workerNodes
[workerNodeKey
].on(
660 this.handleWorkerNodeBackPressureEvent
665 private unsetTasksStealingOnBackPressure (): void {
666 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
667 this.workerNodes
[workerNodeKey
].off(
669 this.handleWorkerNodeBackPressureEvent
675 * Whether the pool is full or not.
677 * The pool filling boolean status.
679 protected get
full (): boolean {
681 this.workerNodes
.length
>=
682 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
687 * Whether the pool is busy or not.
689 * The pool busyness boolean status.
691 protected abstract get
busy (): boolean
694 * Whether worker nodes are executing concurrently their tasks quota or not.
696 * @returns Worker nodes busyness boolean status.
698 protected internalBusy (): boolean {
699 if (this.opts
.enableTasksQueue
=== true) {
701 this.workerNodes
.findIndex(
703 workerNode
.info
.ready
&&
704 workerNode
.usage
.tasks
.executing
<
705 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
706 this.opts
.tasksQueueOptions
!.concurrency
!
711 this.workerNodes
.findIndex(
713 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
718 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
719 if (this.opts
.enableTasksQueue
=== true) {
721 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
722 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
723 this.opts
.tasksQueueOptions
!.concurrency
!
726 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
729 private async sendTaskFunctionOperationToWorker (
730 workerNodeKey
: number,
731 message
: MessageValue
<Data
>
732 ): Promise
<boolean> {
733 return await new Promise
<boolean>((resolve
, reject
) => {
734 const taskFunctionOperationListener
= (
735 message
: MessageValue
<Response
>
737 this.checkMessageWorkerId(message
)
738 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
740 message
.taskFunctionOperationStatus
!= null &&
741 message
.workerId
=== workerId
743 if (message
.taskFunctionOperationStatus
) {
748 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
752 this.deregisterWorkerMessageListener(
753 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
754 taskFunctionOperationListener
758 this.registerWorkerMessageListener(
760 taskFunctionOperationListener
762 this.sendToWorker(workerNodeKey
, message
)
766 private async sendTaskFunctionOperationToWorkers (
767 message
: MessageValue
<Data
>
768 ): Promise
<boolean> {
769 return await new Promise
<boolean>((resolve
, reject
) => {
770 const responsesReceived
= new Array<MessageValue
<Response
>>()
771 const taskFunctionOperationsListener
= (
772 message
: MessageValue
<Response
>
774 this.checkMessageWorkerId(message
)
775 if (message
.taskFunctionOperationStatus
!= null) {
776 responsesReceived
.push(message
)
777 if (responsesReceived
.length
=== this.workerNodes
.length
) {
779 responsesReceived
.every(
780 message
=> message
.taskFunctionOperationStatus
=== true
785 responsesReceived
.some(
786 message
=> message
.taskFunctionOperationStatus
=== false
789 const errorResponse
= responsesReceived
.find(
790 response
=> response
.taskFunctionOperationStatus
=== false
794 `Task function operation '${
795 message.taskFunctionOperation as string
796 }' failed on worker ${errorResponse?.workerId} with error: '${
797 errorResponse?.workerError?.message
802 this.deregisterWorkerMessageListener(
803 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
804 taskFunctionOperationsListener
809 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
810 this.registerWorkerMessageListener(
812 taskFunctionOperationsListener
814 this.sendToWorker(workerNodeKey
, message
)
820 public hasTaskFunction (name
: string): boolean {
821 return this.listTaskFunctionsProperties().some(
822 taskFunctionProperties
=> taskFunctionProperties
.name
=== name
827 public async addTaskFunction (
829 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
830 ): Promise
<boolean> {
831 if (typeof name
!== 'string') {
832 throw new TypeError('name argument must be a string')
834 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
835 throw new TypeError('name argument must not be an empty string')
837 if (typeof fn
=== 'function') {
838 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
840 if (typeof fn
.taskFunction
!== 'function') {
841 throw new TypeError('taskFunction property must be a function')
843 const opResult
= await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation
: 'add',
845 taskFunctionProperties
: buildTaskFunctionProperties(name
, fn
),
846 taskFunction
: fn
.taskFunction
.toString()
848 this.taskFunctions
.set(name
, fn
)
849 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
850 this.getWorkerWorkerChoiceStrategies()
856 public async removeTaskFunction (name
: string): Promise
<boolean> {
857 if (!this.taskFunctions
.has(name
)) {
859 'Cannot remove a task function not handled on the pool side'
862 const opResult
= await this.sendTaskFunctionOperationToWorkers({
863 taskFunctionOperation
: 'remove',
864 taskFunctionProperties
: buildTaskFunctionProperties(
866 this.taskFunctions
.get(name
)
869 this.deleteTaskFunctionWorkerUsages(name
)
870 this.taskFunctions
.delete(name
)
871 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
872 this.getWorkerWorkerChoiceStrategies()
878 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
879 for (const workerNode
of this.workerNodes
) {
881 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
882 workerNode
.info
.taskFunctionsProperties
.length
> 0
884 return workerNode
.info
.taskFunctionsProperties
891 * Gets task function strategy, if any.
893 * @param name - The task function name.
894 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
896 private readonly getTaskFunctionWorkerWorkerChoiceStrategy
= (
898 ): WorkerChoiceStrategy
| undefined => {
900 return this.listTaskFunctionsProperties().find(
901 (taskFunctionProperties
: TaskFunctionProperties
) =>
902 taskFunctionProperties
.name
=== name
908 * Gets the worker choice strategies registered in this pool.
910 * @returns The worker choice strategies.
912 private readonly getWorkerWorkerChoiceStrategies
=
913 (): Set
<WorkerChoiceStrategy
> => {
915 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
916 this.opts
.workerChoiceStrategy
!,
917 ...(this.listTaskFunctionsProperties()
919 (taskFunctionProperties
: TaskFunctionProperties
) =>
920 taskFunctionProperties
.strategy
923 (strategy
: WorkerChoiceStrategy
| undefined) => strategy
!= null
924 ) as WorkerChoiceStrategy
[])
929 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
930 return await this.sendTaskFunctionOperationToWorkers({
931 taskFunctionOperation
: 'default',
932 taskFunctionProperties
: buildTaskFunctionProperties(
934 this.taskFunctions
.get(name
)
939 private deleteTaskFunctionWorkerUsages (name
: string): void {
940 for (const workerNode
of this.workerNodes
) {
941 workerNode
.deleteTaskFunctionWorkerUsage(name
)
945 private shallExecuteTask (workerNodeKey
: number): boolean {
947 this.tasksQueueSize(workerNodeKey
) === 0 &&
948 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
949 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
950 this.opts
.tasksQueueOptions
!.concurrency
!
955 public async execute (
958 transferList
?: readonly TransferListItem
[]
959 ): Promise
<Response
> {
960 return await new Promise
<Response
>((resolve
, reject
) => {
962 reject(new Error('Cannot execute a task on not started pool'))
965 if (this.destroying
) {
966 reject(new Error('Cannot execute a task on destroying pool'))
969 if (name
!= null && typeof name
!== 'string') {
970 reject(new TypeError('name argument must be a string'))
975 typeof name
=== 'string' &&
976 name
.trim().length
=== 0
978 reject(new TypeError('name argument must not be an empty string'))
981 if (transferList
!= null && !Array.isArray(transferList
)) {
982 reject(new TypeError('transferList argument must be an array'))
985 const timestamp
= performance
.now()
986 const workerNodeKey
= this.chooseWorkerNode(
987 this.getTaskFunctionWorkerWorkerChoiceStrategy(name
)
989 const task
: Task
<Data
> = {
990 name
: name
?? DEFAULT_TASK_NAME
,
991 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
992 data
: data
?? ({} as Data
),
997 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
998 this.promiseResponseMap
.set(task
.taskId
!, {
1002 ...(this.emitter
!= null && {
1003 asyncResource
: new AsyncResource('poolifier:task', {
1004 triggerAsyncId
: this.emitter
.asyncId
,
1005 requireManualDestroy
: true
1010 this.opts
.enableTasksQueue
=== false ||
1011 (this.opts
.enableTasksQueue
=== true &&
1012 this.shallExecuteTask(workerNodeKey
))
1014 this.executeTask(workerNodeKey
, task
)
1016 this.enqueueTask(workerNodeKey
, task
)
1022 * Starts the minimum number of workers.
1024 private startMinimumNumberOfWorkers (): void {
1025 this.startingMinimumNumberOfWorkers
= true
1027 this.workerNodes
.reduce(
1028 (accumulator
, workerNode
) =>
1029 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
1031 ) < this.minimumNumberOfWorkers
1033 this.createAndSetupWorkerNode()
1035 this.startingMinimumNumberOfWorkers
= false
1039 public start (): void {
1041 throw new Error('Cannot start an already started pool')
1043 if (this.starting
) {
1044 throw new Error('Cannot start an already starting pool')
1046 if (this.destroying
) {
1047 throw new Error('Cannot start a destroying pool')
1049 this.starting
= true
1050 this.startMinimumNumberOfWorkers()
1051 this.starting
= false
1056 public async destroy (): Promise
<void> {
1057 if (!this.started
) {
1058 throw new Error('Cannot destroy an already destroyed pool')
1060 if (this.starting
) {
1061 throw new Error('Cannot destroy an starting pool')
1063 if (this.destroying
) {
1064 throw new Error('Cannot destroy an already destroying pool')
1066 this.destroying
= true
1068 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1069 await this.destroyWorkerNode(workerNodeKey
)
1072 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1073 this.emitter
?.emitDestroy()
1074 this.readyEventEmitted
= false
1075 this.destroying
= false
1076 this.started
= false
1079 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1080 await new Promise
<void>((resolve
, reject
) => {
1081 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1082 if (this.workerNodes
[workerNodeKey
] == null) {
1086 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1087 this.checkMessageWorkerId(message
)
1088 if (message
.kill
=== 'success') {
1090 } else if (message
.kill
=== 'failure') {
1093 `Kill message handling failed on worker ${message.workerId}`
1098 // FIXME: should be registered only once
1099 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1100 this.sendToWorker(workerNodeKey
, { kill
: true })
1105 * Terminates the worker node given its worker node key.
1107 * @param workerNodeKey - The worker node key.
1109 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1110 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1111 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1112 const workerNode
= this.workerNodes
[workerNodeKey
]
1113 await waitWorkerNodeEvents(
1117 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1118 getDefaultTasksQueueOptions(
1119 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1120 ).tasksFinishedTimeout
1122 await this.sendKillMessageToWorker(workerNodeKey
)
1123 await workerNode
.terminate()
1127 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1128 * Can be overridden.
1132 protected setupHook (): void {
1133 /* Intentionally empty */
1137 * Returns whether the worker is the main worker or not.
1139 * @returns `true` if the worker is the main worker, `false` otherwise.
1141 protected abstract isMain (): boolean
1144 * Hook executed before the worker task execution.
1145 * Can be overridden.
1147 * @param workerNodeKey - The worker node key.
1148 * @param task - The task to execute.
1150 protected beforeTaskExecutionHook (
1151 workerNodeKey
: number,
1154 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1155 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1156 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1157 ++workerUsage
.tasks
.executing
1158 updateWaitTimeWorkerUsage(
1159 this.workerChoiceStrategiesContext
,
1165 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1166 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1167 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1170 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1171 const taskFunctionWorkerUsage
= this.workerNodes
[
1173 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1174 ].getTaskFunctionWorkerUsage(task
.name
!)!
1175 ++taskFunctionWorkerUsage
.tasks
.executing
1176 updateWaitTimeWorkerUsage(
1177 this.workerChoiceStrategiesContext
,
1178 taskFunctionWorkerUsage
,
1185 * Hook executed after the worker task execution.
1186 * Can be overridden.
1188 * @param workerNodeKey - The worker node key.
1189 * @param message - The received message.
1191 protected afterTaskExecutionHook (
1192 workerNodeKey
: number,
1193 message
: MessageValue
<Response
>
1195 let needWorkerChoiceStrategyUpdate
= false
1196 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1197 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1198 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1199 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1200 updateRunTimeWorkerUsage(
1201 this.workerChoiceStrategiesContext
,
1205 updateEluWorkerUsage(
1206 this.workerChoiceStrategiesContext
,
1210 needWorkerChoiceStrategyUpdate
= true
1213 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1214 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1215 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1216 message
.taskPerformance
!.name
1219 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1220 const taskFunctionWorkerUsage
= this.workerNodes
[
1222 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1223 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1224 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1225 updateRunTimeWorkerUsage(
1226 this.workerChoiceStrategiesContext
,
1227 taskFunctionWorkerUsage
,
1230 updateEluWorkerUsage(
1231 this.workerChoiceStrategiesContext
,
1232 taskFunctionWorkerUsage
,
1235 needWorkerChoiceStrategyUpdate
= true
1237 if (needWorkerChoiceStrategyUpdate
) {
1238 this.workerChoiceStrategiesContext
?.update(workerNodeKey
)
1243 * Whether the worker node shall update its task function worker usage or not.
1245 * @param workerNodeKey - The worker node key.
1246 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1248 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1249 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1251 workerInfo
!= null &&
1252 Array.isArray(workerInfo
.taskFunctionsProperties
) &&
1253 workerInfo
.taskFunctionsProperties
.length
> 2
1258 * Chooses a worker node for the next task.
1260 * @param workerChoiceStrategy - The worker choice strategy.
1261 * @returns The chosen worker node key
1263 private chooseWorkerNode (
1264 workerChoiceStrategy
?: WorkerChoiceStrategy
1266 if (this.shallCreateDynamicWorker()) {
1267 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1269 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1272 return workerNodeKey
1275 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1276 return this.workerChoiceStrategiesContext
!.execute(workerChoiceStrategy
)
1280 * Conditions for dynamic worker creation.
1282 * @returns Whether to create a dynamic worker or not.
1284 protected abstract shallCreateDynamicWorker (): boolean
1287 * Sends a message to worker given its worker node key.
1289 * @param workerNodeKey - The worker node key.
1290 * @param message - The message.
1291 * @param transferList - The optional array of transferable objects.
1293 protected abstract sendToWorker (
1294 workerNodeKey
: number,
1295 message
: MessageValue
<Data
>,
1296 transferList
?: readonly TransferListItem
[]
1300 * Creates a new, completely set up worker node.
1302 * @returns New, completely set up worker node key.
1304 protected createAndSetupWorkerNode (): number {
1305 const workerNode
= this.createWorkerNode()
1306 workerNode
.registerWorkerEventHandler(
1308 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1310 workerNode
.registerWorkerEventHandler(
1312 this.opts
.messageHandler
?? EMPTY_FUNCTION
1314 workerNode
.registerWorkerEventHandler(
1316 this.opts
.errorHandler
?? EMPTY_FUNCTION
1318 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1319 workerNode
.info
.ready
= false
1320 this.emitter
?.emit(PoolEvents
.error
, error
)
1324 this.opts
.restartWorkerOnError
=== true
1326 if (workerNode
.info
.dynamic
) {
1327 this.createAndSetupDynamicWorkerNode()
1328 } else if (!this.startingMinimumNumberOfWorkers
) {
1329 this.startMinimumNumberOfWorkers()
1335 this.opts
.enableTasksQueue
=== true
1337 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1339 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1340 workerNode
?.terminate().catch((error
: unknown
) => {
1341 this.emitter
?.emit(PoolEvents
.error
, error
)
1344 workerNode
.registerWorkerEventHandler(
1346 this.opts
.exitHandler
?? EMPTY_FUNCTION
1348 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1349 this.removeWorkerNode(workerNode
)
1352 !this.startingMinimumNumberOfWorkers
&&
1355 this.startMinimumNumberOfWorkers()
1358 const workerNodeKey
= this.addWorkerNode(workerNode
)
1359 this.afterWorkerNodeSetup(workerNodeKey
)
1360 return workerNodeKey
1364 * Creates a new, completely set up dynamic worker node.
1366 * @returns New, completely set up dynamic worker node key.
1368 protected createAndSetupDynamicWorkerNode (): number {
1369 const workerNodeKey
= this.createAndSetupWorkerNode()
1370 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1371 this.checkMessageWorkerId(message
)
1372 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1375 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1376 // Kill message received from worker
1378 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1379 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1380 ((this.opts
.enableTasksQueue
=== false &&
1381 workerUsage
.tasks
.executing
=== 0) ||
1382 (this.opts
.enableTasksQueue
=== true &&
1383 workerUsage
.tasks
.executing
=== 0 &&
1384 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1386 // Flag the worker node as not ready immediately
1387 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1388 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1389 this.emitter
?.emit(PoolEvents
.error
, error
)
1393 this.sendToWorker(workerNodeKey
, {
1396 if (this.taskFunctions
.size
> 0) {
1397 for (const [taskFunctionName
, taskFunctionObject
] of this.taskFunctions
) {
1398 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1399 taskFunctionOperation
: 'add',
1400 taskFunctionProperties
: buildTaskFunctionProperties(
1404 taskFunction
: taskFunctionObject
.taskFunction
.toString()
1405 }).catch((error
: unknown
) => {
1406 this.emitter
?.emit(PoolEvents
.error
, error
)
1410 const workerNode
= this.workerNodes
[workerNodeKey
]
1411 workerNode
.info
.dynamic
= true
1413 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerReady
===
1415 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1418 workerNode
.info
.ready
= true
1420 this.checkAndEmitDynamicWorkerCreationEvents()
1421 return workerNodeKey
1425 * Registers a listener callback on the worker given its worker node key.
1427 * @param workerNodeKey - The worker node key.
1428 * @param listener - The message listener callback.
1430 protected abstract registerWorkerMessageListener
<
1431 Message
extends Data
| Response
1433 workerNodeKey
: number,
1434 listener
: (message
: MessageValue
<Message
>) => void
1438 * Registers once a listener callback on the worker given its worker node key.
1440 * @param workerNodeKey - The worker node key.
1441 * @param listener - The message listener callback.
1443 protected abstract registerOnceWorkerMessageListener
<
1444 Message
extends Data
| Response
1446 workerNodeKey
: number,
1447 listener
: (message
: MessageValue
<Message
>) => void
1451 * Deregisters a listener callback on the worker given its worker node key.
1453 * @param workerNodeKey - The worker node key.
1454 * @param listener - The message listener callback.
1456 protected abstract deregisterWorkerMessageListener
<
1457 Message
extends Data
| Response
1459 workerNodeKey
: number,
1460 listener
: (message
: MessageValue
<Message
>) => void
1464 * Method hooked up after a worker node has been newly created.
1465 * Can be overridden.
1467 * @param workerNodeKey - The newly created worker node key.
1469 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1470 // Listen to worker messages.
1471 this.registerWorkerMessageListener(
1473 this.workerMessageListener
1475 // Send the startup message to worker.
1476 this.sendStartupMessageToWorker(workerNodeKey
)
1477 // Send the statistics message to worker.
1478 this.sendStatisticsMessageToWorker(workerNodeKey
)
1479 if (this.opts
.enableTasksQueue
=== true) {
1480 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1481 this.workerNodes
[workerNodeKey
].on(
1483 this.handleWorkerNodeIdleEvent
1486 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1487 this.workerNodes
[workerNodeKey
].on(
1489 this.handleWorkerNodeBackPressureEvent
1496 * Sends the startup message to worker given its worker node key.
1498 * @param workerNodeKey - The worker node key.
1500 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1503 * Sends the statistics message to worker given its worker node key.
1505 * @param workerNodeKey - The worker node key.
1507 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1508 this.sendToWorker(workerNodeKey
, {
1511 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1512 .runTime
.aggregate
?? false,
1514 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1515 .elu
.aggregate
?? false
1520 private cannotStealTask (): boolean {
1521 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1524 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1525 if (this.shallExecuteTask(workerNodeKey
)) {
1526 this.executeTask(workerNodeKey
, task
)
1528 this.enqueueTask(workerNodeKey
, task
)
1532 private redistributeQueuedTasks (workerNodeKey
: number): void {
1533 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1536 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1537 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1538 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1539 return workerNode
.info
.ready
&&
1540 workerNode
.usage
.tasks
.queued
<
1541 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1548 destinationWorkerNodeKey
,
1549 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1550 this.dequeueTask(workerNodeKey
)!
1555 private updateTaskStolenStatisticsWorkerUsage (
1556 workerNodeKey
: number,
1559 const workerNode
= this.workerNodes
[workerNodeKey
]
1560 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1561 if (workerNode
?.usage
!= null) {
1562 ++workerNode
.usage
.tasks
.stolen
1565 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1566 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1568 const taskFunctionWorkerUsage
=
1569 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1570 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1571 ++taskFunctionWorkerUsage
.tasks
.stolen
1575 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1576 workerNodeKey
: number
1578 const workerNode
= this.workerNodes
[workerNodeKey
]
1579 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1580 if (workerNode
?.usage
!= null) {
1581 ++workerNode
.usage
.tasks
.sequentiallyStolen
1585 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1586 workerNodeKey
: number,
1589 const workerNode
= this.workerNodes
[workerNodeKey
]
1591 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1592 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1594 const taskFunctionWorkerUsage
=
1595 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1596 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1597 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1601 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1602 workerNodeKey
: number
1604 const workerNode
= this.workerNodes
[workerNodeKey
]
1605 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1606 if (workerNode
?.usage
!= null) {
1607 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1611 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1612 workerNodeKey
: number,
1615 const workerNode
= this.workerNodes
[workerNodeKey
]
1617 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1618 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1620 const taskFunctionWorkerUsage
=
1621 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1622 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1623 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1627 private readonly handleWorkerNodeIdleEvent
= (
1628 eventDetail
: WorkerNodeEventDetail
,
1629 previousStolenTask
?: Task
<Data
>
1631 const { workerNodeKey
} = eventDetail
1632 if (workerNodeKey
== null) {
1634 "WorkerNode event detail 'workerNodeKey' property must be defined"
1637 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1639 this.cannotStealTask() ||
1640 (this.info
.stealingWorkerNodes
?? 0) >
1641 Math.floor(this.workerNodes
.length
/ 2)
1643 if (workerInfo
!= null && previousStolenTask
!= null) {
1644 workerInfo
.stealing
= false
1648 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1650 workerInfo
!= null &&
1651 previousStolenTask
!= null &&
1652 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1653 (workerNodeTasksUsage
.executing
> 0 ||
1654 this.tasksQueueSize(workerNodeKey
) > 0)
1656 workerInfo
.stealing
= false
1657 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1658 for (const taskFunctionProperties
of workerInfo
.taskFunctionsProperties
!) {
1659 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1661 taskFunctionProperties
.name
1664 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1667 if (workerInfo
== null) {
1669 `Worker node with key '${workerNodeKey}' not found in pool`
1672 workerInfo
.stealing
= true
1673 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1675 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1678 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1679 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1681 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1682 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1684 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1685 (previousStolenTask
!= null &&
1686 previousStolenTask
.name
=== stolenTask
.name
&&
1687 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1689 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1691 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1695 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1697 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1702 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1704 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1707 .catch((error
: unknown
) => {
1708 this.emitter
?.emit(PoolEvents
.error
, error
)
1712 private readonly workerNodeStealTask
= (
1713 workerNodeKey
: number
1714 ): Task
<Data
> | undefined => {
1715 const workerNodes
= this.workerNodes
1718 (workerNodeA
, workerNodeB
) =>
1719 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1721 const sourceWorkerNode
= workerNodes
.find(
1722 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1723 sourceWorkerNode
.info
.ready
&&
1724 !sourceWorkerNode
.info
.stealing
&&
1725 sourceWorkerNodeKey
!== workerNodeKey
&&
1726 sourceWorkerNode
.usage
.tasks
.queued
> 0
1728 if (sourceWorkerNode
!= null) {
1729 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1730 const task
= sourceWorkerNode
.popTask()!
1731 this.handleTask(workerNodeKey
, task
)
1732 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1733 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1734 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1739 private readonly handleWorkerNodeBackPressureEvent
= (
1740 eventDetail
: WorkerNodeEventDetail
1743 this.cannotStealTask() ||
1744 (this.info
.stealingWorkerNodes
?? 0) >
1745 Math.floor(this.workerNodes
.length
/ 2)
1749 const { workerId
} = eventDetail
1750 const sizeOffset
= 1
1751 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1752 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1755 const sourceWorkerNode
=
1756 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1757 const workerNodes
= this.workerNodes
1760 (workerNodeA
, workerNodeB
) =>
1761 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1763 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1765 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1766 workerNode
.info
.ready
&&
1767 !workerNode
.info
.stealing
&&
1768 workerNode
.info
.id
!== workerId
&&
1769 workerNode
.usage
.tasks
.queued
<
1770 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1771 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1773 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1774 if (workerInfo
== null) {
1776 `Worker node with key '${workerNodeKey}' not found in pool`
1779 workerInfo
.stealing
= true
1780 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1781 const task
= sourceWorkerNode
.popTask()!
1782 this.handleTask(workerNodeKey
, task
)
1783 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1784 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1785 workerInfo
.stealing
= false
1791 * This method is the message listener registered on each worker.
1793 protected readonly workerMessageListener
= (
1794 message
: MessageValue
<Response
>
1796 this.checkMessageWorkerId(message
)
1797 const { workerId
, ready
, taskId
, taskFunctionsProperties
} = message
1798 if (ready
!= null && taskFunctionsProperties
!= null) {
1799 // Worker ready response received from worker
1800 this.handleWorkerReadyResponse(message
)
1801 } else if (taskFunctionsProperties
!= null) {
1802 // Task function properties message received from worker
1803 const workerInfo
= this.getWorkerInfo(
1804 this.getWorkerNodeKeyByWorkerId(workerId
)
1806 if (workerInfo
!= null) {
1807 workerInfo
.taskFunctionsProperties
= taskFunctionsProperties
1809 } else if (taskId
!= null) {
1810 // Task execution response received from worker
1811 this.handleTaskExecutionResponse(message
)
1815 private checkAndEmitReadyEvent (): void {
1816 if (!this.readyEventEmitted
&& this.ready
) {
1817 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1818 this.readyEventEmitted
= true
1822 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1823 const { workerId
, ready
, taskFunctionsProperties
} = message
1824 if (ready
== null || !ready
) {
1825 throw new Error(`Worker ${workerId} failed to initialize`)
1828 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1829 workerNode
.info
.ready
= ready
1830 workerNode
.info
.taskFunctionsProperties
= taskFunctionsProperties
1831 this.checkAndEmitReadyEvent()
1834 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1835 const { workerId
, taskId
, workerError
, data
} = message
1836 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1837 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1838 if (promiseResponse
!= null) {
1839 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1840 const workerNode
= this.workerNodes
[workerNodeKey
]
1841 if (workerError
!= null) {
1842 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1843 asyncResource
!= null
1844 ? asyncResource
.runInAsyncScope(
1849 : reject(workerError
.message
)
1851 asyncResource
!= null
1852 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1853 : resolve(data
as Response
)
1855 asyncResource
?.emitDestroy()
1856 this.afterTaskExecutionHook(workerNodeKey
, message
)
1857 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1858 this.promiseResponseMap
.delete(taskId
!)
1859 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1860 workerNode
?.emit('taskFinished', taskId
)
1862 this.opts
.enableTasksQueue
=== true &&
1864 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1867 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1869 this.tasksQueueSize(workerNodeKey
) > 0 &&
1870 workerNodeTasksUsage
.executing
<
1871 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1872 this.opts
.tasksQueueOptions
!.concurrency
!
1874 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1875 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1878 workerNodeTasksUsage
.executing
=== 0 &&
1879 this.tasksQueueSize(workerNodeKey
) === 0 &&
1880 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1882 workerNode
.emit('idle', {
1891 private checkAndEmitTaskExecutionEvents (): void {
1893 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1897 private checkAndEmitTaskQueuingEvents (): void {
1898 if (this.hasBackPressure()) {
1899 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1904 * Emits dynamic worker creation events.
1906 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1909 * Gets the worker information given its worker node key.
1911 * @param workerNodeKey - The worker node key.
1912 * @returns The worker information.
1914 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1915 return this.workerNodes
[workerNodeKey
]?.info
1919 * Creates a worker node.
1921 * @returns The created worker node.
1923 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1924 const workerNode
= new WorkerNode
<Worker
, Data
>(
1929 workerOptions
: this.opts
.workerOptions
,
1930 tasksQueueBackPressureSize
:
1931 this.opts
.tasksQueueOptions
?.size
??
1932 getDefaultTasksQueueOptions(
1933 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1937 // Flag the worker node as ready at pool startup.
1938 if (this.starting
) {
1939 workerNode
.info
.ready
= true
1945 * Adds the given worker node in the pool worker nodes.
1947 * @param workerNode - The worker node.
1948 * @returns The added worker node key.
1949 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1951 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1952 this.workerNodes
.push(workerNode
)
1953 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1954 if (workerNodeKey
=== -1) {
1955 throw new Error('Worker added not found in worker nodes')
1957 return workerNodeKey
1960 private checkAndEmitEmptyEvent (): void {
1962 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1963 this.readyEventEmitted
= false
1968 * Removes the worker node from the pool worker nodes.
1970 * @param workerNode - The worker node.
1972 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1973 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1974 if (workerNodeKey
!== -1) {
1975 this.workerNodes
.splice(workerNodeKey
, 1)
1976 this.workerChoiceStrategiesContext
?.remove(workerNodeKey
)
1978 this.checkAndEmitEmptyEvent()
1981 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1982 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1983 if (workerInfo
!= null) {
1984 workerInfo
.ready
= false
1988 private hasBackPressure (): boolean {
1990 this.opts
.enableTasksQueue
=== true &&
1991 this.workerNodes
.findIndex(
1992 workerNode
=> !workerNode
.hasBackPressure()
1998 * Executes the given task on the worker given its worker node key.
2000 * @param workerNodeKey - The worker node key.
2001 * @param task - The task to execute.
2003 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
2004 this.beforeTaskExecutionHook(workerNodeKey
, task
)
2005 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
2006 this.checkAndEmitTaskExecutionEvents()
2009 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
2010 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
2011 this.checkAndEmitTaskQueuingEvents()
2012 return tasksQueueSize
2015 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
2016 return this.workerNodes
[workerNodeKey
].dequeueTask()
2019 private tasksQueueSize (workerNodeKey
: number): number {
2020 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
2023 protected flushTasksQueue (workerNodeKey
: number): number {
2024 let flushedTasks
= 0
2025 while (this.tasksQueueSize(workerNodeKey
) > 0) {
2026 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2027 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
2030 this.workerNodes
[workerNodeKey
].clearTasksQueue()
2034 private flushTasksQueues (): void {
2035 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
2036 this.flushTasksQueue(workerNodeKey
)