1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 } from
'../utility-types.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
26 import { KillBehaviors
} from
'../worker/worker-options.js'
34 type TasksQueueOptions
38 WorkerChoiceStrategies
,
39 type WorkerChoiceStrategy
,
40 type WorkerChoiceStrategyOptions
41 } from
'./selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
45 checkValidTasksQueueOptions
,
46 checkValidWorkerChoiceStrategy
,
47 getDefaultTasksQueueOptions
,
49 updateRunTimeWorkerUsage
,
50 updateTaskStatisticsWorkerUsage
,
51 updateWaitTimeWorkerUsage
,
54 import { version
} from
'./version.js'
59 WorkerNodeEventDetail
,
62 import { WorkerNode
} from
'./worker-node.js'
65 * Base class that implements some shared logic for all poolifier pools.
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 export abstract class AbstractPool
<
72 Worker
extends IWorker
,
75 > implements IPool
<Worker
, Data
, Response
> {
77 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
80 public emitter
?: EventEmitterAsyncResource
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
90 new Map
<string, PromiseResponseWrapper
<Response
>>()
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
106 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
109 * Whether the pool is started or not.
111 private started
: boolean
113 * Whether the pool is starting or not.
115 private starting
: boolean
117 * Whether the pool is destroying or not.
119 private destroying
: boolean
121 * Whether the minimum number of workers is starting or not.
123 private startingMinimumNumberOfWorkers
: boolean
125 * Whether the pool ready event has been emitted or not.
127 private readyEventEmitted
: boolean
129 * The start timestamp of the pool.
131 private readonly startTimestamp
134 * Constructs a new poolifier pool.
136 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
137 * @param filePath - Path to the worker file.
138 * @param opts - Options for the pool.
139 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
142 protected readonly minimumNumberOfWorkers
: number,
143 protected readonly filePath
: string,
144 protected readonly opts
: PoolOptions
<Worker
>,
145 protected readonly maximumNumberOfWorkers
?: number
147 if (!this.isMain()) {
149 'Cannot start a pool from a worker with the same type as the pool'
153 checkFilePath(this.filePath
)
154 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
155 this.checkPoolOptions(this.opts
)
157 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
158 this.executeTask
= this.executeTask
.bind(this)
159 this.enqueueTask
= this.enqueueTask
.bind(this)
161 if (this.opts
.enableEvents
=== true) {
162 this.initializeEventEmitter()
164 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
170 this.opts
.workerChoiceStrategy
,
171 this.opts
.workerChoiceStrategyOptions
176 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
179 this.starting
= false
180 this.destroying
= false
181 this.readyEventEmitted
= false
182 this.startingMinimumNumberOfWorkers
= false
183 if (this.opts
.startWorkers
=== true) {
187 this.startTimestamp
= performance
.now()
190 private checkPoolType (): void {
191 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
193 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
198 private checkMinimumNumberOfWorkers (
199 minimumNumberOfWorkers
: number | undefined
201 if (minimumNumberOfWorkers
== null) {
203 'Cannot instantiate a pool without specifying the number of workers'
205 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
207 'Cannot instantiate a pool with a non safe integer number of workers'
209 } else if (minimumNumberOfWorkers
< 0) {
210 throw new RangeError(
211 'Cannot instantiate a pool with a negative number of workers'
213 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
214 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
218 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
219 if (isPlainObject(opts
)) {
220 this.opts
.startWorkers
= opts
.startWorkers
?? true
221 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
222 this.opts
.workerChoiceStrategy
=
223 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
224 this.checkValidWorkerChoiceStrategyOptions(
225 opts
.workerChoiceStrategyOptions
227 if (opts
.workerChoiceStrategyOptions
!= null) {
228 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
230 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
231 this.opts
.enableEvents
= opts
.enableEvents
?? true
232 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
233 if (this.opts
.enableTasksQueue
) {
234 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
235 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
236 opts
.tasksQueueOptions
240 throw new TypeError('Invalid pool options: must be a plain object')
244 private checkValidWorkerChoiceStrategyOptions (
245 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
248 workerChoiceStrategyOptions
!= null &&
249 !isPlainObject(workerChoiceStrategyOptions
)
252 'Invalid worker choice strategy options: must be a plain object'
256 workerChoiceStrategyOptions
?.weights
!= null &&
257 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
258 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
261 'Invalid worker choice strategy options: must have a weight for each worker node'
265 workerChoiceStrategyOptions
?.measurement
!= null &&
266 !Object.values(Measurements
).includes(
267 workerChoiceStrategyOptions
.measurement
271 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
276 private initializeEventEmitter (): void {
277 this.emitter
= new EventEmitterAsyncResource({
278 name
: `poolifier:${this.type}-${this.worker}-pool`
283 public get
info (): PoolInfo
{
288 started
: this.started
,
290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
291 strategy
: this.opts
.workerChoiceStrategy
!,
292 strategyRetries
: this.workerChoiceStrategyContext
?.retriesCount
?? 0,
293 minSize
: this.minimumNumberOfWorkers
,
294 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
295 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
296 .runTime
.aggregate
=== true &&
297 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 .waitTime
.aggregate
&& {
299 utilization
: round(this.utilization
)
301 workerNodes
: this.workerNodes
.length
,
302 idleWorkerNodes
: this.workerNodes
.reduce(
303 (accumulator
, workerNode
) =>
304 workerNode
.usage
.tasks
.executing
=== 0
309 ...(this.opts
.enableTasksQueue
=== true && {
310 stealingWorkerNodes
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
316 busyWorkerNodes
: this.workerNodes
.reduce(
317 (accumulator
, _workerNode
, workerNodeKey
) =>
318 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
321 executedTasks
: this.workerNodes
.reduce(
322 (accumulator
, workerNode
) =>
323 accumulator
+ workerNode
.usage
.tasks
.executed
,
326 executingTasks
: this.workerNodes
.reduce(
327 (accumulator
, workerNode
) =>
328 accumulator
+ workerNode
.usage
.tasks
.executing
,
331 ...(this.opts
.enableTasksQueue
=== true && {
332 queuedTasks
: this.workerNodes
.reduce(
333 (accumulator
, workerNode
) =>
334 accumulator
+ workerNode
.usage
.tasks
.queued
,
338 ...(this.opts
.enableTasksQueue
=== true && {
339 maxQueuedTasks
: this.workerNodes
.reduce(
340 (accumulator
, workerNode
) =>
341 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
345 ...(this.opts
.enableTasksQueue
=== true && {
346 backPressure
: this.hasBackPressure()
348 ...(this.opts
.enableTasksQueue
=== true && {
349 stolenTasks
: this.workerNodes
.reduce(
350 (accumulator
, workerNode
) =>
351 accumulator
+ workerNode
.usage
.tasks
.stolen
,
355 failedTasks
: this.workerNodes
.reduce(
356 (accumulator
, workerNode
) =>
357 accumulator
+ workerNode
.usage
.tasks
.failed
,
360 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
361 .runTime
.aggregate
=== true && {
365 ...this.workerNodes
.map(
366 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
372 ...this.workerNodes
.map(
373 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
377 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
378 .runTime
.average
&& {
381 this.workerNodes
.reduce
<number[]>(
382 (accumulator
, workerNode
) =>
383 accumulator
.concat(workerNode
.usage
.runTime
.history
),
389 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
393 this.workerNodes
.reduce
<number[]>(
394 (accumulator
, workerNode
) =>
395 accumulator
.concat(workerNode
.usage
.runTime
.history
),
403 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
404 .waitTime
.aggregate
=== true && {
408 ...this.workerNodes
.map(
409 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
415 ...this.workerNodes
.map(
416 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
420 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
421 .waitTime
.average
&& {
424 this.workerNodes
.reduce
<number[]>(
425 (accumulator
, workerNode
) =>
426 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
432 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
433 .waitTime
.median
&& {
436 this.workerNodes
.reduce
<number[]>(
437 (accumulator
, workerNode
) =>
438 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
450 * The pool readiness boolean status.
452 private get
ready (): boolean {
457 this.workerNodes
.reduce(
458 (accumulator
, workerNode
) =>
459 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
463 ) >= this.minimumNumberOfWorkers
468 * The pool emptiness boolean status.
470 protected get
empty (): boolean {
471 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
475 * The approximate pool utilization.
477 * @returns The pool utilization.
479 private get
utilization (): number {
480 const poolTimeCapacity
=
481 (performance
.now() - this.startTimestamp
) *
482 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
483 const totalTasksRunTime
= this.workerNodes
.reduce(
484 (accumulator
, workerNode
) =>
485 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
488 const totalTasksWaitTime
= this.workerNodes
.reduce(
489 (accumulator
, workerNode
) =>
490 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
493 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
499 * If it is `'dynamic'`, it provides the `max` property.
501 protected abstract get
type (): PoolType
506 protected abstract get
worker (): WorkerType
509 * Checks if the worker id sent in the received message from a worker is valid.
511 * @param message - The received message.
512 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
514 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
515 if (message
.workerId
== null) {
516 throw new Error('Worker message received without worker id')
517 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
519 `Worker message received from unknown worker '${message.workerId}'`
525 * Gets the worker node key given its worker id.
527 * @param workerId - The worker id.
528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
530 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
531 return this.workerNodes
.findIndex(
532 workerNode
=> workerNode
.info
.id
=== workerId
537 public setWorkerChoiceStrategy (
538 workerChoiceStrategy
: WorkerChoiceStrategy
,
539 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
541 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
542 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
543 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
544 this.opts
.workerChoiceStrategy
546 if (workerChoiceStrategyOptions
!= null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
549 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
550 workerNode
.resetUsage()
551 this.sendStatisticsMessageToWorker(workerNodeKey
)
556 public setWorkerChoiceStrategyOptions (
557 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
560 if (workerChoiceStrategyOptions
!= null) {
561 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
563 this.workerChoiceStrategyContext
?.setOptions(
564 this.opts
.workerChoiceStrategyOptions
569 public enableTasksQueue (
571 tasksQueueOptions
?: TasksQueueOptions
573 if (this.opts
.enableTasksQueue
=== true && !enable
) {
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
576 this.flushTasksQueues()
578 this.opts
.enableTasksQueue
= enable
579 this.setTasksQueueOptions(tasksQueueOptions
)
583 public setTasksQueueOptions (
584 tasksQueueOptions
: TasksQueueOptions
| undefined
586 if (this.opts
.enableTasksQueue
=== true) {
587 checkValidTasksQueueOptions(tasksQueueOptions
)
588 this.opts
.tasksQueueOptions
=
589 this.buildTasksQueueOptions(tasksQueueOptions
)
590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
591 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
592 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
593 this.unsetTaskStealing()
594 this.setTaskStealing()
596 this.unsetTaskStealing()
598 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
599 this.unsetTasksStealingOnBackPressure()
600 this.setTasksStealingOnBackPressure()
602 this.unsetTasksStealingOnBackPressure()
604 } else if (this.opts
.tasksQueueOptions
!= null) {
605 delete this.opts
.tasksQueueOptions
609 private buildTasksQueueOptions (
610 tasksQueueOptions
: TasksQueueOptions
| undefined
611 ): TasksQueueOptions
{
613 ...getDefaultTasksQueueOptions(
614 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
620 private setTasksQueueSize (size
: number): void {
621 for (const workerNode
of this.workerNodes
) {
622 workerNode
.tasksQueueBackPressureSize
= size
626 private setTaskStealing (): void {
627 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
628 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
632 private unsetTaskStealing (): void {
633 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
634 this.workerNodes
[workerNodeKey
].off(
636 this.handleWorkerNodeIdleEvent
641 private setTasksStealingOnBackPressure (): void {
642 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
643 this.workerNodes
[workerNodeKey
].on(
645 this.handleWorkerNodeBackPressureEvent
650 private unsetTasksStealingOnBackPressure (): void {
651 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
652 this.workerNodes
[workerNodeKey
].off(
654 this.handleWorkerNodeBackPressureEvent
660 * Whether the pool is full or not.
662 * The pool filling boolean status.
664 protected get
full (): boolean {
666 this.workerNodes
.length
>=
667 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
672 * Whether the pool is busy or not.
674 * The pool busyness boolean status.
676 protected abstract get
busy (): boolean
679 * Whether worker nodes are executing concurrently their tasks quota or not.
681 * @returns Worker nodes busyness boolean status.
683 protected internalBusy (): boolean {
684 if (this.opts
.enableTasksQueue
=== true) {
686 this.workerNodes
.findIndex(
688 workerNode
.info
.ready
&&
689 workerNode
.usage
.tasks
.executing
<
690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
691 this.opts
.tasksQueueOptions
!.concurrency
!
696 this.workerNodes
.findIndex(
698 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
703 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
704 if (this.opts
.enableTasksQueue
=== true) {
706 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
707 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
708 this.opts
.tasksQueueOptions
!.concurrency
!
711 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
714 private async sendTaskFunctionOperationToWorker (
715 workerNodeKey
: number,
716 message
: MessageValue
<Data
>
717 ): Promise
<boolean> {
718 return await new Promise
<boolean>((resolve
, reject
) => {
719 const taskFunctionOperationListener
= (
720 message
: MessageValue
<Response
>
722 this.checkMessageWorkerId(message
)
723 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
725 message
.taskFunctionOperationStatus
!= null &&
726 message
.workerId
=== workerId
728 if (message
.taskFunctionOperationStatus
) {
733 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
737 this.deregisterWorkerMessageListener(
738 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
739 taskFunctionOperationListener
743 this.registerWorkerMessageListener(
745 taskFunctionOperationListener
747 this.sendToWorker(workerNodeKey
, message
)
751 private async sendTaskFunctionOperationToWorkers (
752 message
: MessageValue
<Data
>
753 ): Promise
<boolean> {
754 return await new Promise
<boolean>((resolve
, reject
) => {
755 const responsesReceived
= new Array<MessageValue
<Response
>>()
756 const taskFunctionOperationsListener
= (
757 message
: MessageValue
<Response
>
759 this.checkMessageWorkerId(message
)
760 if (message
.taskFunctionOperationStatus
!= null) {
761 responsesReceived
.push(message
)
762 if (responsesReceived
.length
=== this.workerNodes
.length
) {
764 responsesReceived
.every(
765 message
=> message
.taskFunctionOperationStatus
=== true
770 responsesReceived
.some(
771 message
=> message
.taskFunctionOperationStatus
=== false
774 const errorResponse
= responsesReceived
.find(
775 response
=> response
.taskFunctionOperationStatus
=== false
779 `Task function operation '${
780 message.taskFunctionOperation as string
781 }' failed on worker ${errorResponse?.workerId} with error: '${
782 errorResponse?.workerError?.message
787 this.deregisterWorkerMessageListener(
788 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
789 taskFunctionOperationsListener
794 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
795 this.registerWorkerMessageListener(
797 taskFunctionOperationsListener
799 this.sendToWorker(workerNodeKey
, message
)
805 public hasTaskFunction (name
: string): boolean {
806 for (const workerNode
of this.workerNodes
) {
808 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
809 workerNode
.info
.taskFunctionNames
.includes(name
)
818 public async addTaskFunction (
820 fn
: TaskFunction
<Data
, Response
>
821 ): Promise
<boolean> {
822 if (typeof name
!== 'string') {
823 throw new TypeError('name argument must be a string')
825 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
826 throw new TypeError('name argument must not be an empty string')
828 if (typeof fn
!== 'function') {
829 throw new TypeError('fn argument must be a function')
831 const opResult
= await this.sendTaskFunctionOperationToWorkers({
832 taskFunctionOperation
: 'add',
833 taskFunctionName
: name
,
834 taskFunction
: fn
.toString()
836 this.taskFunctions
.set(name
, fn
)
841 public async removeTaskFunction (name
: string): Promise
<boolean> {
842 if (!this.taskFunctions
.has(name
)) {
844 'Cannot remove a task function not handled on the pool side'
847 const opResult
= await this.sendTaskFunctionOperationToWorkers({
848 taskFunctionOperation
: 'remove',
849 taskFunctionName
: name
851 this.deleteTaskFunctionWorkerUsages(name
)
852 this.taskFunctions
.delete(name
)
857 public listTaskFunctionNames (): string[] {
858 for (const workerNode
of this.workerNodes
) {
860 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
861 workerNode
.info
.taskFunctionNames
.length
> 0
863 return workerNode
.info
.taskFunctionNames
870 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
871 return await this.sendTaskFunctionOperationToWorkers({
872 taskFunctionOperation
: 'default',
873 taskFunctionName
: name
877 private deleteTaskFunctionWorkerUsages (name
: string): void {
878 for (const workerNode
of this.workerNodes
) {
879 workerNode
.deleteTaskFunctionWorkerUsage(name
)
883 private shallExecuteTask (workerNodeKey
: number): boolean {
885 this.tasksQueueSize(workerNodeKey
) === 0 &&
886 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
887 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
888 this.opts
.tasksQueueOptions
!.concurrency
!
893 public async execute (
896 transferList
?: TransferListItem
[]
897 ): Promise
<Response
> {
898 return await new Promise
<Response
>((resolve
, reject
) => {
900 reject(new Error('Cannot execute a task on not started pool'))
903 if (this.destroying
) {
904 reject(new Error('Cannot execute a task on destroying pool'))
907 if (name
!= null && typeof name
!== 'string') {
908 reject(new TypeError('name argument must be a string'))
913 typeof name
=== 'string' &&
914 name
.trim().length
=== 0
916 reject(new TypeError('name argument must not be an empty string'))
919 if (transferList
!= null && !Array.isArray(transferList
)) {
920 reject(new TypeError('transferList argument must be an array'))
923 const timestamp
= performance
.now()
924 const workerNodeKey
= this.chooseWorkerNode()
925 const task
: Task
<Data
> = {
926 name
: name
?? DEFAULT_TASK_NAME
,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data
: data
?? ({} as Data
),
933 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
934 this.promiseResponseMap
.set(task
.taskId
!, {
938 ...(this.emitter
!= null && {
939 asyncResource
: new AsyncResource('poolifier:task', {
940 triggerAsyncId
: this.emitter
.asyncId
,
941 requireManualDestroy
: true
946 this.opts
.enableTasksQueue
=== false ||
947 (this.opts
.enableTasksQueue
=== true &&
948 this.shallExecuteTask(workerNodeKey
))
950 this.executeTask(workerNodeKey
, task
)
952 this.enqueueTask(workerNodeKey
, task
)
958 * Starts the minimum number of workers.
960 private startMinimumNumberOfWorkers (): void {
961 this.startingMinimumNumberOfWorkers
= true
963 this.workerNodes
.reduce(
964 (accumulator
, workerNode
) =>
965 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
967 ) < this.minimumNumberOfWorkers
969 this.createAndSetupWorkerNode()
971 this.startingMinimumNumberOfWorkers
= false
975 public start (): void {
977 throw new Error('Cannot start an already started pool')
980 throw new Error('Cannot start an already starting pool')
982 if (this.destroying
) {
983 throw new Error('Cannot start a destroying pool')
986 this.startMinimumNumberOfWorkers()
987 this.starting
= false
992 public async destroy (): Promise
<void> {
994 throw new Error('Cannot destroy an already destroyed pool')
997 throw new Error('Cannot destroy an starting pool')
999 if (this.destroying
) {
1000 throw new Error('Cannot destroy an already destroying pool')
1002 this.destroying
= true
1004 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1005 await this.destroyWorkerNode(workerNodeKey
)
1008 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1009 this.emitter
?.emitDestroy()
1010 this.readyEventEmitted
= false
1011 this.destroying
= false
1012 this.started
= false
1015 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1016 await new Promise
<void>((resolve
, reject
) => {
1017 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1018 if (this.workerNodes
[workerNodeKey
] == null) {
1022 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1023 this.checkMessageWorkerId(message
)
1024 if (message
.kill
=== 'success') {
1026 } else if (message
.kill
=== 'failure') {
1029 `Kill message handling failed on worker ${message.workerId}`
1034 // FIXME: should be registered only once
1035 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1036 this.sendToWorker(workerNodeKey
, { kill
: true })
1041 * Terminates the worker node given its worker node key.
1043 * @param workerNodeKey - The worker node key.
1045 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1046 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1047 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1048 const workerNode
= this.workerNodes
[workerNodeKey
]
1049 await waitWorkerNodeEvents(
1053 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1054 getDefaultTasksQueueOptions(
1055 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1056 ).tasksFinishedTimeout
1058 await this.sendKillMessageToWorker(workerNodeKey
)
1059 await workerNode
.terminate()
1063 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1064 * Can be overridden.
1068 protected setupHook (): void {
1069 /* Intentionally empty */
1073 * Returns whether the worker is the main worker or not.
1075 * @returns `true` if the worker is the main worker, `false` otherwise.
1077 protected abstract isMain (): boolean
1080 * Hook executed before the worker task execution.
1081 * Can be overridden.
1083 * @param workerNodeKey - The worker node key.
1084 * @param task - The task to execute.
1086 protected beforeTaskExecutionHook (
1087 workerNodeKey
: number,
1090 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1091 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1092 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1093 ++workerUsage
.tasks
.executing
1094 updateWaitTimeWorkerUsage(
1095 this.workerChoiceStrategyContext
,
1101 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1102 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1103 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1106 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1107 const taskFunctionWorkerUsage
= this.workerNodes
[
1109 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1110 ].getTaskFunctionWorkerUsage(task
.name
!)!
1111 ++taskFunctionWorkerUsage
.tasks
.executing
1112 updateWaitTimeWorkerUsage(
1113 this.workerChoiceStrategyContext
,
1114 taskFunctionWorkerUsage
,
1121 * Hook executed after the worker task execution.
1122 * Can be overridden.
1124 * @param workerNodeKey - The worker node key.
1125 * @param message - The received message.
1127 protected afterTaskExecutionHook (
1128 workerNodeKey
: number,
1129 message
: MessageValue
<Response
>
1131 let needWorkerChoiceStrategyUpdate
= false
1132 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1133 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1134 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1135 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1136 updateRunTimeWorkerUsage(
1137 this.workerChoiceStrategyContext
,
1141 updateEluWorkerUsage(
1142 this.workerChoiceStrategyContext
,
1146 needWorkerChoiceStrategyUpdate
= true
1149 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1150 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1151 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1152 message
.taskPerformance
!.name
1155 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1156 const taskFunctionWorkerUsage
= this.workerNodes
[
1158 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1159 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1160 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1161 updateRunTimeWorkerUsage(
1162 this.workerChoiceStrategyContext
,
1163 taskFunctionWorkerUsage
,
1166 updateEluWorkerUsage(
1167 this.workerChoiceStrategyContext
,
1168 taskFunctionWorkerUsage
,
1171 needWorkerChoiceStrategyUpdate
= true
1173 if (needWorkerChoiceStrategyUpdate
) {
1174 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1179 * Whether the worker node shall update its task function worker usage or not.
1181 * @param workerNodeKey - The worker node key.
1182 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1184 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1185 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1187 workerInfo
!= null &&
1188 Array.isArray(workerInfo
.taskFunctionNames
) &&
1189 workerInfo
.taskFunctionNames
.length
> 2
1194 * Chooses a worker node for the next task.
1196 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1198 * @returns The chosen worker node key
1200 private chooseWorkerNode (): number {
1201 if (this.shallCreateDynamicWorker()) {
1202 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1204 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1205 .dynamicWorkerUsage
=== true
1207 return workerNodeKey
1210 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1211 return this.workerChoiceStrategyContext
!.execute()
1215 * Conditions for dynamic worker creation.
1217 * @returns Whether to create a dynamic worker or not.
1219 protected abstract shallCreateDynamicWorker (): boolean
1222 * Sends a message to worker given its worker node key.
1224 * @param workerNodeKey - The worker node key.
1225 * @param message - The message.
1226 * @param transferList - The optional array of transferable objects.
1228 protected abstract sendToWorker (
1229 workerNodeKey
: number,
1230 message
: MessageValue
<Data
>,
1231 transferList
?: TransferListItem
[]
1235 * Creates a new, completely set up worker node.
1237 * @returns New, completely set up worker node key.
1239 protected createAndSetupWorkerNode (): number {
1240 const workerNode
= this.createWorkerNode()
1241 workerNode
.registerWorkerEventHandler(
1243 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1245 workerNode
.registerWorkerEventHandler(
1247 this.opts
.messageHandler
?? EMPTY_FUNCTION
1249 workerNode
.registerWorkerEventHandler(
1251 this.opts
.errorHandler
?? EMPTY_FUNCTION
1253 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1254 workerNode
.info
.ready
= false
1255 this.emitter
?.emit(PoolEvents
.error
, error
)
1259 this.opts
.restartWorkerOnError
=== true
1261 if (workerNode
.info
.dynamic
) {
1262 this.createAndSetupDynamicWorkerNode()
1263 } else if (!this.startingMinimumNumberOfWorkers
) {
1264 this.startMinimumNumberOfWorkers()
1270 this.opts
.enableTasksQueue
=== true
1272 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1274 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1275 workerNode
?.terminate().catch((error
: unknown
) => {
1276 this.emitter
?.emit(PoolEvents
.error
, error
)
1279 workerNode
.registerWorkerEventHandler(
1281 this.opts
.exitHandler
?? EMPTY_FUNCTION
1283 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1284 this.removeWorkerNode(workerNode
)
1287 !this.startingMinimumNumberOfWorkers
&&
1290 this.startMinimumNumberOfWorkers()
1293 const workerNodeKey
= this.addWorkerNode(workerNode
)
1294 this.afterWorkerNodeSetup(workerNodeKey
)
1295 return workerNodeKey
1299 * Creates a new, completely set up dynamic worker node.
1301 * @returns New, completely set up dynamic worker node key.
1303 protected createAndSetupDynamicWorkerNode (): number {
1304 const workerNodeKey
= this.createAndSetupWorkerNode()
1305 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1306 this.checkMessageWorkerId(message
)
1307 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1310 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1311 // Kill message received from worker
1313 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1314 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1315 ((this.opts
.enableTasksQueue
=== false &&
1316 workerUsage
.tasks
.executing
=== 0) ||
1317 (this.opts
.enableTasksQueue
=== true &&
1318 workerUsage
.tasks
.executing
=== 0 &&
1319 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1321 // Flag the worker node as not ready immediately
1322 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1323 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1324 this.emitter
?.emit(PoolEvents
.error
, error
)
1328 this.sendToWorker(workerNodeKey
, {
1331 if (this.taskFunctions
.size
> 0) {
1332 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1333 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1334 taskFunctionOperation
: 'add',
1336 taskFunction
: taskFunction
.toString()
1337 }).catch((error
: unknown
) => {
1338 this.emitter
?.emit(PoolEvents
.error
, error
)
1342 const workerNode
= this.workerNodes
[workerNodeKey
]
1343 workerNode
.info
.dynamic
= true
1345 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1346 .dynamicWorkerReady
=== true ||
1347 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1348 .dynamicWorkerUsage
=== true
1350 workerNode
.info
.ready
= true
1352 this.checkAndEmitDynamicWorkerCreationEvents()
1353 return workerNodeKey
1357 * Registers a listener callback on the worker given its worker node key.
1359 * @param workerNodeKey - The worker node key.
1360 * @param listener - The message listener callback.
1362 protected abstract registerWorkerMessageListener
<
1363 Message
extends Data
| Response
1365 workerNodeKey
: number,
1366 listener
: (message
: MessageValue
<Message
>) => void
1370 * Registers once a listener callback on the worker given its worker node key.
1372 * @param workerNodeKey - The worker node key.
1373 * @param listener - The message listener callback.
1375 protected abstract registerOnceWorkerMessageListener
<
1376 Message
extends Data
| Response
1378 workerNodeKey
: number,
1379 listener
: (message
: MessageValue
<Message
>) => void
1383 * Deregisters a listener callback on the worker given its worker node key.
1385 * @param workerNodeKey - The worker node key.
1386 * @param listener - The message listener callback.
1388 protected abstract deregisterWorkerMessageListener
<
1389 Message
extends Data
| Response
1391 workerNodeKey
: number,
1392 listener
: (message
: MessageValue
<Message
>) => void
1396 * Method hooked up after a worker node has been newly created.
1397 * Can be overridden.
1399 * @param workerNodeKey - The newly created worker node key.
1401 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1402 // Listen to worker messages.
1403 this.registerWorkerMessageListener(
1405 this.workerMessageListener
1407 // Send the startup message to worker.
1408 this.sendStartupMessageToWorker(workerNodeKey
)
1409 // Send the statistics message to worker.
1410 this.sendStatisticsMessageToWorker(workerNodeKey
)
1411 if (this.opts
.enableTasksQueue
=== true) {
1412 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1413 this.workerNodes
[workerNodeKey
].on(
1415 this.handleWorkerNodeIdleEvent
1418 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1419 this.workerNodes
[workerNodeKey
].on(
1421 this.handleWorkerNodeBackPressureEvent
1428 * Sends the startup message to worker given its worker node key.
1430 * @param workerNodeKey - The worker node key.
1432 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1435 * Sends the statistics message to worker given its worker node key.
1437 * @param workerNodeKey - The worker node key.
1439 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1440 this.sendToWorker(workerNodeKey
, {
1443 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1444 .runTime
.aggregate
?? false,
1446 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
1452 private cannotStealTask (): boolean {
1453 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1456 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1457 if (this.shallExecuteTask(workerNodeKey
)) {
1458 this.executeTask(workerNodeKey
, task
)
1460 this.enqueueTask(workerNodeKey
, task
)
1464 private redistributeQueuedTasks (workerNodeKey
: number): void {
1465 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1468 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1469 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1470 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1471 return workerNode
.info
.ready
&&
1472 workerNode
.usage
.tasks
.queued
<
1473 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1480 destinationWorkerNodeKey
,
1481 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1482 this.dequeueTask(workerNodeKey
)!
1487 private updateTaskStolenStatisticsWorkerUsage (
1488 workerNodeKey
: number,
1491 const workerNode
= this.workerNodes
[workerNodeKey
]
1492 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1493 if (workerNode
?.usage
!= null) {
1494 ++workerNode
.usage
.tasks
.stolen
1497 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1498 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1500 const taskFunctionWorkerUsage
=
1501 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1502 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1503 ++taskFunctionWorkerUsage
.tasks
.stolen
1507 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1508 workerNodeKey
: number
1510 const workerNode
= this.workerNodes
[workerNodeKey
]
1511 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1512 if (workerNode
?.usage
!= null) {
1513 ++workerNode
.usage
.tasks
.sequentiallyStolen
1517 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1518 workerNodeKey
: number,
1521 const workerNode
= this.workerNodes
[workerNodeKey
]
1523 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1524 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1526 const taskFunctionWorkerUsage
=
1527 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1528 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1529 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1533 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1534 workerNodeKey
: number
1536 const workerNode
= this.workerNodes
[workerNodeKey
]
1537 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1538 if (workerNode
?.usage
!= null) {
1539 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1543 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1544 workerNodeKey
: number,
1547 const workerNode
= this.workerNodes
[workerNodeKey
]
1549 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1550 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1552 const taskFunctionWorkerUsage
=
1553 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1554 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1555 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1559 private readonly handleWorkerNodeIdleEvent
= (
1560 eventDetail
: WorkerNodeEventDetail
,
1561 previousStolenTask
?: Task
<Data
>
1563 const { workerNodeKey
} = eventDetail
1564 if (workerNodeKey
== null) {
1566 "WorkerNode event detail 'workerNodeKey' property must be defined"
1569 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1571 this.cannotStealTask() ||
1572 (this.info
.stealingWorkerNodes
?? 0) >
1573 Math.floor(this.workerNodes
.length
/ 2)
1575 if (workerInfo
!= null && previousStolenTask
!= null) {
1576 workerInfo
.stealing
= false
1580 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1582 workerInfo
!= null &&
1583 previousStolenTask
!= null &&
1584 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1585 (workerNodeTasksUsage
.executing
> 0 ||
1586 this.tasksQueueSize(workerNodeKey
) > 0)
1588 workerInfo
.stealing
= false
1589 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1590 for (const taskName
of workerInfo
.taskFunctionNames
!) {
1591 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1596 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1599 if (workerInfo
== null) {
1601 `Worker node with key '${workerNodeKey}' not found in pool`
1604 workerInfo
.stealing
= true
1605 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1607 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1613 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1614 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1616 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1617 (previousStolenTask
!= null &&
1618 previousStolenTask
.name
=== stolenTask
.name
&&
1619 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1621 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1623 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1627 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1629 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1634 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1636 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1639 .catch((error
: unknown
) => {
1640 this.emitter
?.emit(PoolEvents
.error
, error
)
1644 private readonly workerNodeStealTask
= (
1645 workerNodeKey
: number
1646 ): Task
<Data
> | undefined => {
1647 const workerNodes
= this.workerNodes
1650 (workerNodeA
, workerNodeB
) =>
1651 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1653 const sourceWorkerNode
= workerNodes
.find(
1654 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1655 sourceWorkerNode
.info
.ready
&&
1656 !sourceWorkerNode
.info
.stealing
&&
1657 sourceWorkerNodeKey
!== workerNodeKey
&&
1658 sourceWorkerNode
.usage
.tasks
.queued
> 0
1660 if (sourceWorkerNode
!= null) {
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 const task
= sourceWorkerNode
.popTask()!
1663 this.handleTask(workerNodeKey
, task
)
1664 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1665 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1666 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1671 private readonly handleWorkerNodeBackPressureEvent
= (
1672 eventDetail
: WorkerNodeEventDetail
1675 this.cannotStealTask() ||
1676 (this.info
.stealingWorkerNodes
?? 0) >
1677 Math.floor(this.workerNodes
.length
/ 2)
1681 const { workerId
} = eventDetail
1682 const sizeOffset
= 1
1683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1684 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1687 const sourceWorkerNode
=
1688 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1689 const workerNodes
= this.workerNodes
1692 (workerNodeA
, workerNodeB
) =>
1693 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1695 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1697 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1698 workerNode
.info
.ready
&&
1699 !workerNode
.info
.stealing
&&
1700 workerNode
.info
.id
!== workerId
&&
1701 workerNode
.usage
.tasks
.queued
<
1702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1703 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1705 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1706 if (workerInfo
== null) {
1708 `Worker node with key '${workerNodeKey}' not found in pool`
1711 workerInfo
.stealing
= true
1712 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1713 const task
= sourceWorkerNode
.popTask()!
1714 this.handleTask(workerNodeKey
, task
)
1715 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1716 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1717 workerInfo
.stealing
= false
1723 * This method is the message listener registered on each worker.
1725 protected readonly workerMessageListener
= (
1726 message
: MessageValue
<Response
>
1728 this.checkMessageWorkerId(message
)
1729 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1730 if (ready
!= null && taskFunctionNames
!= null) {
1731 // Worker ready response received from worker
1732 this.handleWorkerReadyResponse(message
)
1733 } else if (taskId
!= null) {
1734 // Task execution response received from worker
1735 this.handleTaskExecutionResponse(message
)
1736 } else if (taskFunctionNames
!= null) {
1737 // Task function names message received from worker
1738 const workerInfo
= this.getWorkerInfo(
1739 this.getWorkerNodeKeyByWorkerId(workerId
)
1741 if (workerInfo
!= null) {
1742 workerInfo
.taskFunctionNames
= taskFunctionNames
1747 private checkAndEmitReadyEvent (): void {
1748 if (!this.readyEventEmitted
&& this.ready
) {
1749 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1750 this.readyEventEmitted
= true
1754 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1755 const { workerId
, ready
, taskFunctionNames
} = message
1756 if (ready
== null || !ready
) {
1757 throw new Error(`Worker ${workerId} failed to initialize`)
1760 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1761 workerNode
.info
.ready
= ready
1762 workerNode
.info
.taskFunctionNames
= taskFunctionNames
1763 this.checkAndEmitReadyEvent()
1766 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1767 const { workerId
, taskId
, workerError
, data
} = message
1768 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1769 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1770 if (promiseResponse
!= null) {
1771 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1772 const workerNode
= this.workerNodes
[workerNodeKey
]
1773 if (workerError
!= null) {
1774 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1775 asyncResource
!= null
1776 ? asyncResource
.runInAsyncScope(
1781 : reject(workerError
.message
)
1783 asyncResource
!= null
1784 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1785 : resolve(data
as Response
)
1787 asyncResource
?.emitDestroy()
1788 this.afterTaskExecutionHook(workerNodeKey
, message
)
1789 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1790 this.promiseResponseMap
.delete(taskId
!)
1791 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1792 workerNode
?.emit('taskFinished', taskId
)
1794 this.opts
.enableTasksQueue
=== true &&
1796 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1799 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1801 this.tasksQueueSize(workerNodeKey
) > 0 &&
1802 workerNodeTasksUsage
.executing
<
1803 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1804 this.opts
.tasksQueueOptions
!.concurrency
!
1806 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1807 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1810 workerNodeTasksUsage
.executing
=== 0 &&
1811 this.tasksQueueSize(workerNodeKey
) === 0 &&
1812 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1814 workerNode
.emit('idle', {
1823 private checkAndEmitTaskExecutionEvents (): void {
1825 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1829 private checkAndEmitTaskQueuingEvents (): void {
1830 if (this.hasBackPressure()) {
1831 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1836 * Emits dynamic worker creation events.
1838 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1841 * Gets the worker information given its worker node key.
1843 * @param workerNodeKey - The worker node key.
1844 * @returns The worker information.
1846 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1847 return this.workerNodes
[workerNodeKey
]?.info
1851 * Creates a worker node.
1853 * @returns The created worker node.
1855 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1856 const workerNode
= new WorkerNode
<Worker
, Data
>(
1861 workerOptions
: this.opts
.workerOptions
,
1862 tasksQueueBackPressureSize
:
1863 this.opts
.tasksQueueOptions
?.size
??
1864 getDefaultTasksQueueOptions(
1865 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1869 // Flag the worker node as ready at pool startup.
1870 if (this.starting
) {
1871 workerNode
.info
.ready
= true
1877 * Adds the given worker node in the pool worker nodes.
1879 * @param workerNode - The worker node.
1880 * @returns The added worker node key.
1881 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1883 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1884 this.workerNodes
.push(workerNode
)
1885 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1886 if (workerNodeKey
=== -1) {
1887 throw new Error('Worker added not found in worker nodes')
1889 return workerNodeKey
1892 private checkAndEmitEmptyEvent (): void {
1894 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1895 this.readyEventEmitted
= false
1900 * Removes the worker node from the pool worker nodes.
1902 * @param workerNode - The worker node.
1904 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1905 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1906 if (workerNodeKey
!== -1) {
1907 this.workerNodes
.splice(workerNodeKey
, 1)
1908 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1910 this.checkAndEmitEmptyEvent()
1913 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1914 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1915 if (workerInfo
!= null) {
1916 workerInfo
.ready
= false
1920 private hasBackPressure (): boolean {
1922 this.opts
.enableTasksQueue
=== true &&
1923 this.workerNodes
.findIndex(
1924 workerNode
=> !workerNode
.hasBackPressure()
1930 * Executes the given task on the worker given its worker node key.
1932 * @param workerNodeKey - The worker node key.
1933 * @param task - The task to execute.
1935 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1936 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1937 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1938 this.checkAndEmitTaskExecutionEvents()
1941 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1942 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1943 this.checkAndEmitTaskQueuingEvents()
1944 return tasksQueueSize
1947 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1948 return this.workerNodes
[workerNodeKey
].dequeueTask()
1951 private tasksQueueSize (workerNodeKey
: number): number {
1952 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1955 protected flushTasksQueue (workerNodeKey
: number): number {
1956 let flushedTasks
= 0
1957 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1958 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1959 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1962 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1966 private flushTasksQueues (): void {
1967 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1968 this.flushTasksQueue(workerNodeKey
)