1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 TaskFunctionProperties
12 } from
'../utility-types.js'
15 buildTaskFunctionProperties
,
30 } from
'../worker/task-functions.js'
31 import { KillBehaviors
} from
'../worker/worker-options.js'
39 type TasksQueueOptions
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types.js'
47 import { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
50 checkValidTasksQueueOptions
,
51 checkValidWorkerChoiceStrategy
,
52 getDefaultTasksQueueOptions
,
54 updateRunTimeWorkerUsage
,
55 updateTaskStatisticsWorkerUsage
,
56 updateWaitTimeWorkerUsage
,
59 import { version
} from
'./version.js'
64 WorkerNodeEventDetail
,
67 import { WorkerNode
} from
'./worker-node.js'
70 * Base class that implements some shared logic for all poolifier pools.
72 * @typeParam Worker - Type of worker which manages this pool.
73 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
74 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
76 export abstract class AbstractPool
<
77 Worker
extends IWorker
,
80 > implements IPool
<Worker
, Data
, Response
> {
82 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
85 public emitter
?: EventEmitterAsyncResource
88 * The task execution response promise map:
89 * - `key`: The message id of each submitted task.
90 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
92 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
94 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
95 new Map
<string, PromiseResponseWrapper
<Response
>>()
98 * Worker choice strategies context referencing worker choice algorithms implementation.
100 protected workerChoiceStrategiesContext
?: WorkerChoiceStrategiesContext
<
107 * The task functions added at runtime map:
108 * - `key`: The task function name.
109 * - `value`: The task function object.
111 private readonly taskFunctions
: Map
<
113 TaskFunctionObject
<Data
, Response
>
117 * Whether the pool is started or not.
119 private started
: boolean
121 * Whether the pool is starting or not.
123 private starting
: boolean
125 * Whether the pool is destroying or not.
127 private destroying
: boolean
129 * Whether the minimum number of workers is starting or not.
131 private startingMinimumNumberOfWorkers
: boolean
133 * Whether the pool ready event has been emitted or not.
135 private readyEventEmitted
: boolean
137 * The start timestamp of the pool.
139 private readonly startTimestamp
142 * Constructs a new poolifier pool.
144 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
145 * @param filePath - Path to the worker file.
146 * @param opts - Options for the pool.
147 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
150 protected readonly minimumNumberOfWorkers
: number,
151 protected readonly filePath
: string,
152 protected readonly opts
: PoolOptions
<Worker
>,
153 protected readonly maximumNumberOfWorkers
?: number
155 if (!this.isMain()) {
157 'Cannot start a pool from a worker with the same type as the pool'
161 checkFilePath(this.filePath
)
162 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
163 this.checkPoolOptions(this.opts
)
165 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
166 this.executeTask
= this.executeTask
.bind(this)
167 this.enqueueTask
= this.enqueueTask
.bind(this)
169 if (this.opts
.enableEvents
=== true) {
170 this.initializeEventEmitter()
172 this.workerChoiceStrategiesContext
= new WorkerChoiceStrategiesContext
<
178 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
179 [this.opts
.workerChoiceStrategy
!],
180 this.opts
.workerChoiceStrategyOptions
185 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
188 this.starting
= false
189 this.destroying
= false
190 this.readyEventEmitted
= false
191 this.startingMinimumNumberOfWorkers
= false
192 if (this.opts
.startWorkers
=== true) {
196 this.startTimestamp
= performance
.now()
199 private checkPoolType (): void {
200 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
202 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
207 private checkMinimumNumberOfWorkers (
208 minimumNumberOfWorkers
: number | undefined
210 if (minimumNumberOfWorkers
== null) {
212 'Cannot instantiate a pool without specifying the number of workers'
214 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
216 'Cannot instantiate a pool with a non safe integer number of workers'
218 } else if (minimumNumberOfWorkers
< 0) {
219 throw new RangeError(
220 'Cannot instantiate a pool with a negative number of workers'
222 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
223 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
227 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
228 if (isPlainObject(opts
)) {
229 this.opts
.startWorkers
= opts
.startWorkers
?? true
230 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
231 this.opts
.workerChoiceStrategy
=
232 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
233 this.checkValidWorkerChoiceStrategyOptions(
234 opts
.workerChoiceStrategyOptions
236 if (opts
.workerChoiceStrategyOptions
!= null) {
237 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
239 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
240 this.opts
.enableEvents
= opts
.enableEvents
?? true
241 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
242 if (this.opts
.enableTasksQueue
) {
243 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
244 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
245 opts
.tasksQueueOptions
249 throw new TypeError('Invalid pool options: must be a plain object')
253 private checkValidWorkerChoiceStrategyOptions (
254 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
257 workerChoiceStrategyOptions
!= null &&
258 !isPlainObject(workerChoiceStrategyOptions
)
261 'Invalid worker choice strategy options: must be a plain object'
265 workerChoiceStrategyOptions
?.weights
!= null &&
266 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
267 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
270 'Invalid worker choice strategy options: must have a weight for each worker node'
274 workerChoiceStrategyOptions
?.measurement
!= null &&
275 !Object.values(Measurements
).includes(
276 workerChoiceStrategyOptions
.measurement
280 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
285 private initializeEventEmitter (): void {
286 this.emitter
= new EventEmitterAsyncResource({
287 name
: `poolifier:${this.type}-${this.worker}-pool`
292 public get
info (): PoolInfo
{
297 started
: this.started
,
299 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
300 defaultStrategy
: this.opts
.workerChoiceStrategy
!,
301 strategyRetries
: this.workerChoiceStrategiesContext
?.retriesCount
?? 0,
302 minSize
: this.minimumNumberOfWorkers
,
303 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
304 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
305 .runTime
.aggregate
=== true &&
306 this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
307 .waitTime
.aggregate
&& {
308 utilization
: round(this.utilization
)
310 workerNodes
: this.workerNodes
.length
,
311 idleWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, workerNode
) =>
313 workerNode
.usage
.tasks
.executing
=== 0
318 ...(this.opts
.enableTasksQueue
=== true && {
319 stealingWorkerNodes
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
325 busyWorkerNodes
: this.workerNodes
.reduce(
326 (accumulator
, _workerNode
, workerNodeKey
) =>
327 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
330 executedTasks
: this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ workerNode
.usage
.tasks
.executed
,
335 executingTasks
: this.workerNodes
.reduce(
336 (accumulator
, workerNode
) =>
337 accumulator
+ workerNode
.usage
.tasks
.executing
,
340 ...(this.opts
.enableTasksQueue
=== true && {
341 queuedTasks
: this.workerNodes
.reduce(
342 (accumulator
, workerNode
) =>
343 accumulator
+ workerNode
.usage
.tasks
.queued
,
347 ...(this.opts
.enableTasksQueue
=== true && {
348 maxQueuedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
354 ...(this.opts
.enableTasksQueue
=== true && {
355 backPressure
: this.hasBackPressure()
357 ...(this.opts
.enableTasksQueue
=== true && {
358 stolenTasks
: this.workerNodes
.reduce(
359 (accumulator
, workerNode
) =>
360 accumulator
+ workerNode
.usage
.tasks
.stolen
,
364 failedTasks
: this.workerNodes
.reduce(
365 (accumulator
, workerNode
) =>
366 accumulator
+ workerNode
.usage
.tasks
.failed
,
369 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
370 .runTime
.aggregate
=== true && {
374 ...this.workerNodes
.map(
375 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
381 ...this.workerNodes
.map(
382 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
386 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
387 .runTime
.average
&& {
390 this.workerNodes
.reduce
<number[]>(
391 (accumulator
, workerNode
) =>
392 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
402 this.workerNodes
.reduce
<number[]>(
403 (accumulator
, workerNode
) =>
404 accumulator
.concat(workerNode
.usage
.runTime
.history
),
412 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
413 .waitTime
.aggregate
=== true && {
417 ...this.workerNodes
.map(
418 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
424 ...this.workerNodes
.map(
425 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
429 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
430 .waitTime
.average
&& {
433 this.workerNodes
.reduce
<number[]>(
434 (accumulator
, workerNode
) =>
435 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
441 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
442 .waitTime
.median
&& {
445 this.workerNodes
.reduce
<number[]>(
446 (accumulator
, workerNode
) =>
447 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
459 * The pool readiness boolean status.
461 private get
ready (): boolean {
466 this.workerNodes
.reduce(
467 (accumulator
, workerNode
) =>
468 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
472 ) >= this.minimumNumberOfWorkers
477 * The pool emptiness boolean status.
479 protected get
empty (): boolean {
480 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
484 * The approximate pool utilization.
486 * @returns The pool utilization.
488 private get
utilization (): number {
489 const poolTimeCapacity
=
490 (performance
.now() - this.startTimestamp
) *
491 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
492 const totalTasksRunTime
= this.workerNodes
.reduce(
493 (accumulator
, workerNode
) =>
494 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
497 const totalTasksWaitTime
= this.workerNodes
.reduce(
498 (accumulator
, workerNode
) =>
499 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
502 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
508 * If it is `'dynamic'`, it provides the `max` property.
510 protected abstract get
type (): PoolType
515 protected abstract get
worker (): WorkerType
518 * Checks if the worker id sent in the received message from a worker is valid.
520 * @param message - The received message.
521 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
523 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
524 if (message
.workerId
== null) {
525 throw new Error('Worker message received without worker id')
526 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
528 `Worker message received from unknown worker '${message.workerId}'`
534 * Gets the worker node key given its worker id.
536 * @param workerId - The worker id.
537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
539 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
540 return this.workerNodes
.findIndex(
541 workerNode
=> workerNode
.info
.id
=== workerId
546 public setWorkerChoiceStrategy (
547 workerChoiceStrategy
: WorkerChoiceStrategy
,
548 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
550 let requireSync
= false
551 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
552 if (workerChoiceStrategyOptions
!= null) {
553 requireSync
= this.setWorkerChoiceStrategyOptions(
554 workerChoiceStrategyOptions
557 if (workerChoiceStrategy
!== this.opts
.workerChoiceStrategy
) {
558 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
559 this.workerChoiceStrategiesContext
?.setDefaultWorkerChoiceStrategy(
560 this.opts
.workerChoiceStrategy
,
561 this.opts
.workerChoiceStrategyOptions
566 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
567 this.getWorkerWorkerChoiceStrategies(),
568 this.opts
.workerChoiceStrategyOptions
570 for (const workerNodeKey
of this.workerNodes
.keys()) {
571 this.sendStatisticsMessageToWorker(workerNodeKey
)
577 public setWorkerChoiceStrategyOptions (
578 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
580 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
581 if (workerChoiceStrategyOptions
!= null) {
582 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
583 this.workerChoiceStrategiesContext
?.setOptions(
584 this.opts
.workerChoiceStrategyOptions
586 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
587 this.getWorkerWorkerChoiceStrategies(),
588 this.opts
.workerChoiceStrategyOptions
590 for (const workerNodeKey
of this.workerNodes
.keys()) {
591 this.sendStatisticsMessageToWorker(workerNodeKey
)
599 public enableTasksQueue (
601 tasksQueueOptions
?: TasksQueueOptions
603 if (this.opts
.enableTasksQueue
=== true && !enable
) {
604 this.unsetTaskStealing()
605 this.unsetTasksStealingOnBackPressure()
606 this.flushTasksQueues()
608 this.opts
.enableTasksQueue
= enable
609 this.setTasksQueueOptions(tasksQueueOptions
)
613 public setTasksQueueOptions (
614 tasksQueueOptions
: TasksQueueOptions
| undefined
616 if (this.opts
.enableTasksQueue
=== true) {
617 checkValidTasksQueueOptions(tasksQueueOptions
)
618 this.opts
.tasksQueueOptions
=
619 this.buildTasksQueueOptions(tasksQueueOptions
)
620 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
621 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
622 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
623 this.unsetTaskStealing()
624 this.setTaskStealing()
626 this.unsetTaskStealing()
628 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
629 this.unsetTasksStealingOnBackPressure()
630 this.setTasksStealingOnBackPressure()
632 this.unsetTasksStealingOnBackPressure()
634 } else if (this.opts
.tasksQueueOptions
!= null) {
635 delete this.opts
.tasksQueueOptions
639 private buildTasksQueueOptions (
640 tasksQueueOptions
: TasksQueueOptions
| undefined
641 ): TasksQueueOptions
{
643 ...getDefaultTasksQueueOptions(
644 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
650 private setTasksQueueSize (size
: number): void {
651 for (const workerNode
of this.workerNodes
) {
652 workerNode
.tasksQueueBackPressureSize
= size
656 private setTaskStealing (): void {
657 for (const workerNodeKey
of this.workerNodes
.keys()) {
658 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
662 private unsetTaskStealing (): void {
663 for (const workerNodeKey
of this.workerNodes
.keys()) {
664 this.workerNodes
[workerNodeKey
].off(
666 this.handleWorkerNodeIdleEvent
671 private setTasksStealingOnBackPressure (): void {
672 for (const workerNodeKey
of this.workerNodes
.keys()) {
673 this.workerNodes
[workerNodeKey
].on(
675 this.handleWorkerNodeBackPressureEvent
680 private unsetTasksStealingOnBackPressure (): void {
681 for (const workerNodeKey
of this.workerNodes
.keys()) {
682 this.workerNodes
[workerNodeKey
].off(
684 this.handleWorkerNodeBackPressureEvent
690 * Whether the pool is full or not.
692 * The pool filling boolean status.
694 protected get
full (): boolean {
696 this.workerNodes
.length
>=
697 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
702 * Whether the pool is busy or not.
704 * The pool busyness boolean status.
706 protected abstract get
busy (): boolean
709 * Whether worker nodes are executing concurrently their tasks quota or not.
711 * @returns Worker nodes busyness boolean status.
713 protected internalBusy (): boolean {
714 if (this.opts
.enableTasksQueue
=== true) {
716 this.workerNodes
.findIndex(
718 workerNode
.info
.ready
&&
719 workerNode
.usage
.tasks
.executing
<
720 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
721 this.opts
.tasksQueueOptions
!.concurrency
!
726 this.workerNodes
.findIndex(
728 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
733 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
734 if (this.opts
.enableTasksQueue
=== true) {
736 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
737 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
738 this.opts
.tasksQueueOptions
!.concurrency
!
741 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
744 private async sendTaskFunctionOperationToWorker (
745 workerNodeKey
: number,
746 message
: MessageValue
<Data
>
747 ): Promise
<boolean> {
748 return await new Promise
<boolean>((resolve
, reject
) => {
749 const taskFunctionOperationListener
= (
750 message
: MessageValue
<Response
>
752 this.checkMessageWorkerId(message
)
753 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
755 message
.taskFunctionOperationStatus
!= null &&
756 message
.workerId
=== workerId
758 if (message
.taskFunctionOperationStatus
) {
763 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
767 this.deregisterWorkerMessageListener(
768 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
769 taskFunctionOperationListener
773 this.registerWorkerMessageListener(
775 taskFunctionOperationListener
777 this.sendToWorker(workerNodeKey
, message
)
781 private async sendTaskFunctionOperationToWorkers (
782 message
: MessageValue
<Data
>
783 ): Promise
<boolean> {
784 return await new Promise
<boolean>((resolve
, reject
) => {
785 const responsesReceived
= new Array<MessageValue
<Response
>>()
786 const taskFunctionOperationsListener
= (
787 message
: MessageValue
<Response
>
789 this.checkMessageWorkerId(message
)
790 if (message
.taskFunctionOperationStatus
!= null) {
791 responsesReceived
.push(message
)
792 if (responsesReceived
.length
=== this.workerNodes
.length
) {
794 responsesReceived
.every(
795 message
=> message
.taskFunctionOperationStatus
=== true
800 responsesReceived
.some(
801 message
=> message
.taskFunctionOperationStatus
=== false
804 const errorResponse
= responsesReceived
.find(
805 response
=> response
.taskFunctionOperationStatus
=== false
809 `Task function operation '${
810 message.taskFunctionOperation as string
811 }' failed on worker ${errorResponse?.workerId} with error: '${
812 errorResponse?.workerError?.message
817 this.deregisterWorkerMessageListener(
818 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
819 taskFunctionOperationsListener
824 for (const workerNodeKey
of this.workerNodes
.keys()) {
825 this.registerWorkerMessageListener(
827 taskFunctionOperationsListener
829 this.sendToWorker(workerNodeKey
, message
)
835 public hasTaskFunction (name
: string): boolean {
836 return this.listTaskFunctionsProperties().some(
837 taskFunctionProperties
=> taskFunctionProperties
.name
=== name
842 public async addTaskFunction (
844 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
845 ): Promise
<boolean> {
846 if (typeof name
!== 'string') {
847 throw new TypeError('name argument must be a string')
849 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
850 throw new TypeError('name argument must not be an empty string')
852 if (typeof fn
=== 'function') {
853 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
855 if (typeof fn
.taskFunction
!== 'function') {
856 throw new TypeError('taskFunction property must be a function')
858 const opResult
= await this.sendTaskFunctionOperationToWorkers({
859 taskFunctionOperation
: 'add',
860 taskFunctionProperties
: buildTaskFunctionProperties(name
, fn
),
861 taskFunction
: fn
.taskFunction
.toString()
863 this.taskFunctions
.set(name
, fn
)
864 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
865 this.getWorkerWorkerChoiceStrategies()
871 public async removeTaskFunction (name
: string): Promise
<boolean> {
872 if (!this.taskFunctions
.has(name
)) {
874 'Cannot remove a task function not handled on the pool side'
877 const opResult
= await this.sendTaskFunctionOperationToWorkers({
878 taskFunctionOperation
: 'remove',
879 taskFunctionProperties
: buildTaskFunctionProperties(
881 this.taskFunctions
.get(name
)
884 this.deleteTaskFunctionWorkerUsages(name
)
885 this.taskFunctions
.delete(name
)
886 this.workerChoiceStrategiesContext
?.syncWorkerChoiceStrategies(
887 this.getWorkerWorkerChoiceStrategies()
893 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
894 for (const workerNode
of this.workerNodes
) {
896 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
897 workerNode
.info
.taskFunctionsProperties
.length
> 0
899 return workerNode
.info
.taskFunctionsProperties
906 * Gets task function strategy, if any.
908 * @param name - The task function name.
909 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
911 private readonly getTaskFunctionWorkerWorkerChoiceStrategy
= (
913 ): WorkerChoiceStrategy
| undefined => {
915 return this.listTaskFunctionsProperties().find(
916 (taskFunctionProperties
: TaskFunctionProperties
) =>
917 taskFunctionProperties
.name
=== name
923 * Gets the worker choice strategies registered in this pool.
925 * @returns The worker choice strategies.
927 private readonly getWorkerWorkerChoiceStrategies
=
928 (): Set
<WorkerChoiceStrategy
> => {
930 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
931 this.opts
.workerChoiceStrategy
!,
932 ...(this.listTaskFunctionsProperties()
934 (taskFunctionProperties
: TaskFunctionProperties
) =>
935 taskFunctionProperties
.strategy
938 (strategy
: WorkerChoiceStrategy
| undefined) => strategy
!= null
939 ) as WorkerChoiceStrategy
[])
944 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
945 return await this.sendTaskFunctionOperationToWorkers({
946 taskFunctionOperation
: 'default',
947 taskFunctionProperties
: buildTaskFunctionProperties(
949 this.taskFunctions
.get(name
)
954 private deleteTaskFunctionWorkerUsages (name
: string): void {
955 for (const workerNode
of this.workerNodes
) {
956 workerNode
.deleteTaskFunctionWorkerUsage(name
)
960 private shallExecuteTask (workerNodeKey
: number): boolean {
962 this.tasksQueueSize(workerNodeKey
) === 0 &&
963 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
964 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
965 this.opts
.tasksQueueOptions
!.concurrency
!
970 public async execute (
973 transferList
?: readonly TransferListItem
[]
974 ): Promise
<Response
> {
975 return await new Promise
<Response
>((resolve
, reject
) => {
977 reject(new Error('Cannot execute a task on not started pool'))
980 if (this.destroying
) {
981 reject(new Error('Cannot execute a task on destroying pool'))
984 if (name
!= null && typeof name
!== 'string') {
985 reject(new TypeError('name argument must be a string'))
990 typeof name
=== 'string' &&
991 name
.trim().length
=== 0
993 reject(new TypeError('name argument must not be an empty string'))
996 if (transferList
!= null && !Array.isArray(transferList
)) {
997 reject(new TypeError('transferList argument must be an array'))
1000 const timestamp
= performance
.now()
1001 const workerNodeKey
= this.chooseWorkerNode(
1002 this.getTaskFunctionWorkerWorkerChoiceStrategy(name
)
1004 const task
: Task
<Data
> = {
1005 name
: name
?? DEFAULT_TASK_NAME
,
1006 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
1007 data
: data
?? ({} as Data
),
1010 taskId
: randomUUID()
1012 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1013 this.promiseResponseMap
.set(task
.taskId
!, {
1017 ...(this.emitter
!= null && {
1018 asyncResource
: new AsyncResource('poolifier:task', {
1019 triggerAsyncId
: this.emitter
.asyncId
,
1020 requireManualDestroy
: true
1025 this.opts
.enableTasksQueue
=== false ||
1026 (this.opts
.enableTasksQueue
=== true &&
1027 this.shallExecuteTask(workerNodeKey
))
1029 this.executeTask(workerNodeKey
, task
)
1031 this.enqueueTask(workerNodeKey
, task
)
1037 * Starts the minimum number of workers.
1039 private startMinimumNumberOfWorkers (): void {
1040 this.startingMinimumNumberOfWorkers
= true
1042 this.workerNodes
.reduce(
1043 (accumulator
, workerNode
) =>
1044 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
1046 ) < this.minimumNumberOfWorkers
1048 this.createAndSetupWorkerNode()
1050 this.startingMinimumNumberOfWorkers
= false
1054 public start (): void {
1056 throw new Error('Cannot start an already started pool')
1058 if (this.starting
) {
1059 throw new Error('Cannot start an already starting pool')
1061 if (this.destroying
) {
1062 throw new Error('Cannot start a destroying pool')
1064 this.starting
= true
1065 this.startMinimumNumberOfWorkers()
1066 this.starting
= false
1071 public async destroy (): Promise
<void> {
1072 if (!this.started
) {
1073 throw new Error('Cannot destroy an already destroyed pool')
1075 if (this.starting
) {
1076 throw new Error('Cannot destroy an starting pool')
1078 if (this.destroying
) {
1079 throw new Error('Cannot destroy an already destroying pool')
1081 this.destroying
= true
1083 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1084 await this.destroyWorkerNode(workerNodeKey
)
1087 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1088 this.emitter
?.emitDestroy()
1089 this.readyEventEmitted
= false
1090 this.destroying
= false
1091 this.started
= false
1094 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1095 await new Promise
<void>((resolve
, reject
) => {
1096 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1097 if (this.workerNodes
[workerNodeKey
] == null) {
1101 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1102 this.checkMessageWorkerId(message
)
1103 if (message
.kill
=== 'success') {
1105 } else if (message
.kill
=== 'failure') {
1108 `Kill message handling failed on worker ${message.workerId}`
1113 // FIXME: should be registered only once
1114 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1115 this.sendToWorker(workerNodeKey
, { kill
: true })
1120 * Terminates the worker node given its worker node key.
1122 * @param workerNodeKey - The worker node key.
1124 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1125 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1126 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1127 const workerNode
= this.workerNodes
[workerNodeKey
]
1128 await waitWorkerNodeEvents(
1132 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1133 getDefaultTasksQueueOptions(
1134 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1135 ).tasksFinishedTimeout
1137 await this.sendKillMessageToWorker(workerNodeKey
)
1138 await workerNode
.terminate()
1142 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1143 * Can be overridden.
1147 protected setupHook (): void {
1148 /* Intentionally empty */
1152 * Returns whether the worker is the main worker or not.
1154 * @returns `true` if the worker is the main worker, `false` otherwise.
1156 protected abstract isMain (): boolean
1159 * Hook executed before the worker task execution.
1160 * Can be overridden.
1162 * @param workerNodeKey - The worker node key.
1163 * @param task - The task to execute.
1165 protected beforeTaskExecutionHook (
1166 workerNodeKey
: number,
1169 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1170 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1171 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1172 ++workerUsage
.tasks
.executing
1173 updateWaitTimeWorkerUsage(
1174 this.workerChoiceStrategiesContext
,
1180 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1181 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1182 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1185 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1186 const taskFunctionWorkerUsage
= this.workerNodes
[
1188 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1189 ].getTaskFunctionWorkerUsage(task
.name
!)!
1190 ++taskFunctionWorkerUsage
.tasks
.executing
1191 updateWaitTimeWorkerUsage(
1192 this.workerChoiceStrategiesContext
,
1193 taskFunctionWorkerUsage
,
1200 * Hook executed after the worker task execution.
1201 * Can be overridden.
1203 * @param workerNodeKey - The worker node key.
1204 * @param message - The received message.
1206 protected afterTaskExecutionHook (
1207 workerNodeKey
: number,
1208 message
: MessageValue
<Response
>
1210 let needWorkerChoiceStrategyUpdate
= false
1211 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1212 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1213 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1214 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1215 updateRunTimeWorkerUsage(
1216 this.workerChoiceStrategiesContext
,
1220 updateEluWorkerUsage(
1221 this.workerChoiceStrategiesContext
,
1225 needWorkerChoiceStrategyUpdate
= true
1228 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1229 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1230 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1231 message
.taskPerformance
!.name
1234 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1235 const taskFunctionWorkerUsage
= this.workerNodes
[
1237 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1238 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1239 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1240 updateRunTimeWorkerUsage(
1241 this.workerChoiceStrategiesContext
,
1242 taskFunctionWorkerUsage
,
1245 updateEluWorkerUsage(
1246 this.workerChoiceStrategiesContext
,
1247 taskFunctionWorkerUsage
,
1250 needWorkerChoiceStrategyUpdate
= true
1252 if (needWorkerChoiceStrategyUpdate
) {
1253 this.workerChoiceStrategiesContext
?.update(workerNodeKey
)
1258 * Whether the worker node shall update its task function worker usage or not.
1260 * @param workerNodeKey - The worker node key.
1261 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1263 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1264 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1266 workerInfo
!= null &&
1267 Array.isArray(workerInfo
.taskFunctionsProperties
) &&
1268 workerInfo
.taskFunctionsProperties
.length
> 2
1273 * Chooses a worker node for the next task.
1275 * @param workerChoiceStrategy - The worker choice strategy.
1276 * @returns The chosen worker node key
1278 private chooseWorkerNode (
1279 workerChoiceStrategy
?: WorkerChoiceStrategy
1281 if (this.shallCreateDynamicWorker()) {
1282 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1284 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1287 return workerNodeKey
1290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1291 return this.workerChoiceStrategiesContext
!.execute(workerChoiceStrategy
)
1295 * Conditions for dynamic worker creation.
1297 * @returns Whether to create a dynamic worker or not.
1299 protected abstract shallCreateDynamicWorker (): boolean
1302 * Sends a message to worker given its worker node key.
1304 * @param workerNodeKey - The worker node key.
1305 * @param message - The message.
1306 * @param transferList - The optional array of transferable objects.
1308 protected abstract sendToWorker (
1309 workerNodeKey
: number,
1310 message
: MessageValue
<Data
>,
1311 transferList
?: readonly TransferListItem
[]
1315 * Creates a new, completely set up worker node.
1317 * @returns New, completely set up worker node key.
1319 protected createAndSetupWorkerNode (): number {
1320 const workerNode
= this.createWorkerNode()
1321 workerNode
.registerWorkerEventHandler(
1323 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1325 workerNode
.registerWorkerEventHandler(
1327 this.opts
.messageHandler
?? EMPTY_FUNCTION
1329 workerNode
.registerWorkerEventHandler(
1331 this.opts
.errorHandler
?? EMPTY_FUNCTION
1333 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1334 workerNode
.info
.ready
= false
1335 this.emitter
?.emit(PoolEvents
.error
, error
)
1339 this.opts
.restartWorkerOnError
=== true
1341 if (workerNode
.info
.dynamic
) {
1342 this.createAndSetupDynamicWorkerNode()
1343 } else if (!this.startingMinimumNumberOfWorkers
) {
1344 this.startMinimumNumberOfWorkers()
1350 this.opts
.enableTasksQueue
=== true
1352 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1354 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1355 workerNode
?.terminate().catch((error
: unknown
) => {
1356 this.emitter
?.emit(PoolEvents
.error
, error
)
1359 workerNode
.registerWorkerEventHandler(
1361 this.opts
.exitHandler
?? EMPTY_FUNCTION
1363 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1364 this.removeWorkerNode(workerNode
)
1367 !this.startingMinimumNumberOfWorkers
&&
1370 this.startMinimumNumberOfWorkers()
1373 const workerNodeKey
= this.addWorkerNode(workerNode
)
1374 this.afterWorkerNodeSetup(workerNodeKey
)
1375 return workerNodeKey
1379 * Creates a new, completely set up dynamic worker node.
1381 * @returns New, completely set up dynamic worker node key.
1383 protected createAndSetupDynamicWorkerNode (): number {
1384 const workerNodeKey
= this.createAndSetupWorkerNode()
1385 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1386 this.checkMessageWorkerId(message
)
1387 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1390 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1391 // Kill message received from worker
1393 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1394 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1395 ((this.opts
.enableTasksQueue
=== false &&
1396 workerUsage
.tasks
.executing
=== 0) ||
1397 (this.opts
.enableTasksQueue
=== true &&
1398 workerUsage
.tasks
.executing
=== 0 &&
1399 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1401 // Flag the worker node as not ready immediately
1402 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1403 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1404 this.emitter
?.emit(PoolEvents
.error
, error
)
1408 this.sendToWorker(workerNodeKey
, {
1411 if (this.taskFunctions
.size
> 0) {
1412 for (const [taskFunctionName
, taskFunctionObject
] of this.taskFunctions
) {
1413 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1414 taskFunctionOperation
: 'add',
1415 taskFunctionProperties
: buildTaskFunctionProperties(
1419 taskFunction
: taskFunctionObject
.taskFunction
.toString()
1420 }).catch((error
: unknown
) => {
1421 this.emitter
?.emit(PoolEvents
.error
, error
)
1425 const workerNode
= this.workerNodes
[workerNodeKey
]
1426 workerNode
.info
.dynamic
= true
1428 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerReady
===
1430 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1433 workerNode
.info
.ready
= true
1435 this.checkAndEmitDynamicWorkerCreationEvents()
1436 return workerNodeKey
1440 * Registers a listener callback on the worker given its worker node key.
1442 * @param workerNodeKey - The worker node key.
1443 * @param listener - The message listener callback.
1445 protected abstract registerWorkerMessageListener
<
1446 Message
extends Data
| Response
1448 workerNodeKey
: number,
1449 listener
: (message
: MessageValue
<Message
>) => void
1453 * Registers once a listener callback on the worker given its worker node key.
1455 * @param workerNodeKey - The worker node key.
1456 * @param listener - The message listener callback.
1458 protected abstract registerOnceWorkerMessageListener
<
1459 Message
extends Data
| Response
1461 workerNodeKey
: number,
1462 listener
: (message
: MessageValue
<Message
>) => void
1466 * Deregisters a listener callback on the worker given its worker node key.
1468 * @param workerNodeKey - The worker node key.
1469 * @param listener - The message listener callback.
1471 protected abstract deregisterWorkerMessageListener
<
1472 Message
extends Data
| Response
1474 workerNodeKey
: number,
1475 listener
: (message
: MessageValue
<Message
>) => void
1479 * Method hooked up after a worker node has been newly created.
1480 * Can be overridden.
1482 * @param workerNodeKey - The newly created worker node key.
1484 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1485 // Listen to worker messages.
1486 this.registerWorkerMessageListener(
1488 this.workerMessageListener
1490 // Send the startup message to worker.
1491 this.sendStartupMessageToWorker(workerNodeKey
)
1492 // Send the statistics message to worker.
1493 this.sendStatisticsMessageToWorker(workerNodeKey
)
1494 if (this.opts
.enableTasksQueue
=== true) {
1495 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1496 this.workerNodes
[workerNodeKey
].on(
1498 this.handleWorkerNodeIdleEvent
1501 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1502 this.workerNodes
[workerNodeKey
].on(
1504 this.handleWorkerNodeBackPressureEvent
1511 * Sends the startup message to worker given its worker node key.
1513 * @param workerNodeKey - The worker node key.
1515 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1518 * Sends the statistics message to worker given its worker node key.
1520 * @param workerNodeKey - The worker node key.
1522 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1523 this.sendToWorker(workerNodeKey
, {
1526 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1527 .runTime
.aggregate
?? false,
1529 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1530 .elu
.aggregate
?? false
1535 private cannotStealTask (): boolean {
1536 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1539 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1540 if (this.shallExecuteTask(workerNodeKey
)) {
1541 this.executeTask(workerNodeKey
, task
)
1543 this.enqueueTask(workerNodeKey
, task
)
1547 private redistributeQueuedTasks (workerNodeKey
: number): void {
1548 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1551 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1552 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1553 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1554 return workerNode
.info
.ready
&&
1555 workerNode
.usage
.tasks
.queued
<
1556 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1563 destinationWorkerNodeKey
,
1564 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1565 this.dequeueTask(workerNodeKey
)!
1570 private updateTaskStolenStatisticsWorkerUsage (
1571 workerNodeKey
: number,
1574 const workerNode
= this.workerNodes
[workerNodeKey
]
1575 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1576 if (workerNode
?.usage
!= null) {
1577 ++workerNode
.usage
.tasks
.stolen
1580 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1581 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1583 const taskFunctionWorkerUsage
=
1584 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1585 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1586 ++taskFunctionWorkerUsage
.tasks
.stolen
1590 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1591 workerNodeKey
: number
1593 const workerNode
= this.workerNodes
[workerNodeKey
]
1594 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1595 if (workerNode
?.usage
!= null) {
1596 ++workerNode
.usage
.tasks
.sequentiallyStolen
1600 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1601 workerNodeKey
: number,
1604 const workerNode
= this.workerNodes
[workerNodeKey
]
1606 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1607 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1609 const taskFunctionWorkerUsage
=
1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1612 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1616 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1617 workerNodeKey
: number
1619 const workerNode
= this.workerNodes
[workerNodeKey
]
1620 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1621 if (workerNode
?.usage
!= null) {
1622 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1626 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1627 workerNodeKey
: number,
1630 const workerNode
= this.workerNodes
[workerNodeKey
]
1632 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1633 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1635 const taskFunctionWorkerUsage
=
1636 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1637 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1638 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1642 private readonly handleWorkerNodeIdleEvent
= (
1643 eventDetail
: WorkerNodeEventDetail
,
1644 previousStolenTask
?: Task
<Data
>
1646 const { workerNodeKey
} = eventDetail
1647 if (workerNodeKey
== null) {
1649 "WorkerNode event detail 'workerNodeKey' property must be defined"
1652 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1654 this.cannotStealTask() ||
1655 (this.info
.stealingWorkerNodes
?? 0) >
1656 Math.floor(this.workerNodes
.length
/ 2)
1658 if (workerInfo
!= null && previousStolenTask
!= null) {
1659 workerInfo
.stealing
= false
1663 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1665 workerInfo
!= null &&
1666 previousStolenTask
!= null &&
1667 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1668 (workerNodeTasksUsage
.executing
> 0 ||
1669 this.tasksQueueSize(workerNodeKey
) > 0)
1671 workerInfo
.stealing
= false
1672 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1673 for (const taskFunctionProperties
of workerInfo
.taskFunctionsProperties
!) {
1674 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1676 taskFunctionProperties
.name
1679 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1682 if (workerInfo
== null) {
1684 `Worker node with key '${workerNodeKey}' not found in pool`
1687 workerInfo
.stealing
= true
1688 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1690 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1696 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1697 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1699 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1700 (previousStolenTask
!= null &&
1701 previousStolenTask
.name
=== stolenTask
.name
&&
1702 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1704 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1706 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1710 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1712 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1717 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1719 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1722 .catch((error
: unknown
) => {
1723 this.emitter
?.emit(PoolEvents
.error
, error
)
1727 private readonly workerNodeStealTask
= (
1728 workerNodeKey
: number
1729 ): Task
<Data
> | undefined => {
1730 const workerNodes
= this.workerNodes
1733 (workerNodeA
, workerNodeB
) =>
1734 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1736 const sourceWorkerNode
= workerNodes
.find(
1737 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1738 sourceWorkerNode
.info
.ready
&&
1739 !sourceWorkerNode
.info
.stealing
&&
1740 sourceWorkerNodeKey
!== workerNodeKey
&&
1741 sourceWorkerNode
.usage
.tasks
.queued
> 0
1743 if (sourceWorkerNode
!= null) {
1744 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1745 const task
= sourceWorkerNode
.popTask()!
1746 this.handleTask(workerNodeKey
, task
)
1747 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1748 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1749 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1754 private readonly handleWorkerNodeBackPressureEvent
= (
1755 eventDetail
: WorkerNodeEventDetail
1758 this.cannotStealTask() ||
1759 (this.info
.stealingWorkerNodes
?? 0) >
1760 Math.floor(this.workerNodes
.length
/ 2)
1764 const { workerId
} = eventDetail
1765 const sizeOffset
= 1
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1770 const sourceWorkerNode
=
1771 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1772 const workerNodes
= this.workerNodes
1775 (workerNodeA
, workerNodeB
) =>
1776 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1778 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1780 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1781 workerNode
.info
.ready
&&
1782 !workerNode
.info
.stealing
&&
1783 workerNode
.info
.id
!== workerId
&&
1784 workerNode
.usage
.tasks
.queued
<
1785 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1786 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1788 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1789 if (workerInfo
== null) {
1791 `Worker node with key '${workerNodeKey}' not found in pool`
1794 workerInfo
.stealing
= true
1795 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1796 const task
= sourceWorkerNode
.popTask()!
1797 this.handleTask(workerNodeKey
, task
)
1798 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1799 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1800 workerInfo
.stealing
= false
1806 * This method is the message listener registered on each worker.
1808 protected readonly workerMessageListener
= (
1809 message
: MessageValue
<Response
>
1811 this.checkMessageWorkerId(message
)
1812 const { workerId
, ready
, taskId
, taskFunctionsProperties
} = message
1813 if (ready
!= null && taskFunctionsProperties
!= null) {
1814 // Worker ready response received from worker
1815 this.handleWorkerReadyResponse(message
)
1816 } else if (taskFunctionsProperties
!= null) {
1817 // Task function properties message received from worker
1818 const workerInfo
= this.getWorkerInfo(
1819 this.getWorkerNodeKeyByWorkerId(workerId
)
1821 if (workerInfo
!= null) {
1822 workerInfo
.taskFunctionsProperties
= taskFunctionsProperties
1824 } else if (taskId
!= null) {
1825 // Task execution response received from worker
1826 this.handleTaskExecutionResponse(message
)
1830 private checkAndEmitReadyEvent (): void {
1831 if (!this.readyEventEmitted
&& this.ready
) {
1832 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1833 this.readyEventEmitted
= true
1837 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1838 const { workerId
, ready
, taskFunctionsProperties
} = message
1839 if (ready
== null || !ready
) {
1840 throw new Error(`Worker ${workerId} failed to initialize`)
1843 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1844 workerNode
.info
.ready
= ready
1845 workerNode
.info
.taskFunctionsProperties
= taskFunctionsProperties
1846 this.checkAndEmitReadyEvent()
1849 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1850 const { workerId
, taskId
, workerError
, data
} = message
1851 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1852 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1853 if (promiseResponse
!= null) {
1854 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1855 const workerNode
= this.workerNodes
[workerNodeKey
]
1856 if (workerError
!= null) {
1857 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1858 asyncResource
!= null
1859 ? asyncResource
.runInAsyncScope(
1864 : reject(workerError
.message
)
1866 asyncResource
!= null
1867 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1868 : resolve(data
as Response
)
1870 asyncResource
?.emitDestroy()
1871 this.afterTaskExecutionHook(workerNodeKey
, message
)
1872 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1873 this.promiseResponseMap
.delete(taskId
!)
1874 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1875 workerNode
?.emit('taskFinished', taskId
)
1877 this.opts
.enableTasksQueue
=== true &&
1879 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1882 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1884 this.tasksQueueSize(workerNodeKey
) > 0 &&
1885 workerNodeTasksUsage
.executing
<
1886 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1887 this.opts
.tasksQueueOptions
!.concurrency
!
1889 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1890 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1893 workerNodeTasksUsage
.executing
=== 0 &&
1894 this.tasksQueueSize(workerNodeKey
) === 0 &&
1895 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1897 workerNode
.emit('idle', {
1906 private checkAndEmitTaskExecutionEvents (): void {
1908 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1912 private checkAndEmitTaskQueuingEvents (): void {
1913 if (this.hasBackPressure()) {
1914 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1919 * Emits dynamic worker creation events.
1921 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1924 * Gets the worker information given its worker node key.
1926 * @param workerNodeKey - The worker node key.
1927 * @returns The worker information.
1929 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1930 return this.workerNodes
[workerNodeKey
]?.info
1934 * Creates a worker node.
1936 * @returns The created worker node.
1938 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1939 const workerNode
= new WorkerNode
<Worker
, Data
>(
1944 workerOptions
: this.opts
.workerOptions
,
1945 tasksQueueBackPressureSize
:
1946 this.opts
.tasksQueueOptions
?.size
??
1947 getDefaultTasksQueueOptions(
1948 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1952 // Flag the worker node as ready at pool startup.
1953 if (this.starting
) {
1954 workerNode
.info
.ready
= true
1960 * Adds the given worker node in the pool worker nodes.
1962 * @param workerNode - The worker node.
1963 * @returns The added worker node key.
1964 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1966 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1967 this.workerNodes
.push(workerNode
)
1968 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1969 if (workerNodeKey
=== -1) {
1970 throw new Error('Worker added not found in worker nodes')
1972 return workerNodeKey
1975 private checkAndEmitEmptyEvent (): void {
1977 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1978 this.readyEventEmitted
= false
1983 * Removes the worker node from the pool worker nodes.
1985 * @param workerNode - The worker node.
1987 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1988 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1989 if (workerNodeKey
!== -1) {
1990 this.workerNodes
.splice(workerNodeKey
, 1)
1991 this.workerChoiceStrategiesContext
?.remove(workerNodeKey
)
1993 this.checkAndEmitEmptyEvent()
1996 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1997 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1998 if (workerInfo
!= null) {
1999 workerInfo
.ready
= false
2003 private hasBackPressure (): boolean {
2005 this.opts
.enableTasksQueue
=== true &&
2006 this.workerNodes
.findIndex(
2007 workerNode
=> !workerNode
.hasBackPressure()
2013 * Executes the given task on the worker given its worker node key.
2015 * @param workerNodeKey - The worker node key.
2016 * @param task - The task to execute.
2018 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
2019 this.beforeTaskExecutionHook(workerNodeKey
, task
)
2020 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
2021 this.checkAndEmitTaskExecutionEvents()
2024 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
2025 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
2026 this.checkAndEmitTaskQueuingEvents()
2027 return tasksQueueSize
2030 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
2031 return this.workerNodes
[workerNodeKey
].dequeueTask()
2034 private tasksQueueSize (workerNodeKey
: number): number {
2035 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
2038 protected flushTasksQueue (workerNodeKey
: number): number {
2039 let flushedTasks
= 0
2040 while (this.tasksQueueSize(workerNodeKey
) > 0) {
2041 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2042 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
2045 this.workerNodes
[workerNodeKey
].clearTasksQueue()
2049 private flushTasksQueues (): void {
2050 for (const workerNodeKey
of this.workerNodes
.keys()) {
2051 this.flushTasksQueue(workerNodeKey
)