1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types'
24 import { KillBehaviors
} from
'../worker/worker-options'
25 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
40 WorkerNodeEventDetail
,
46 WorkerChoiceStrategies
,
47 type WorkerChoiceStrategy
,
48 type WorkerChoiceStrategyOptions
49 } from
'./selection-strategies/selection-strategies-types'
50 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
51 import { version
} from
'./version'
52 import { WorkerNode
} from
'./worker-node'
55 checkValidTasksQueueOptions
,
56 checkValidWorkerChoiceStrategy
,
57 getDefaultTasksQueueOptions
,
59 updateRunTimeWorkerUsage
,
60 updateTaskStatisticsWorkerUsage
,
61 updateWaitTimeWorkerUsage
,
66 * Base class that implements some shared logic for all poolifier pools.
68 * @typeParam Worker - Type of worker which manages this pool.
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
72 export abstract class AbstractPool
<
73 Worker
extends IWorker
,
76 > implements IPool
<Worker
, Data
, Response
> {
78 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
81 public emitter
?: EventEmitterAsyncResource
84 * The task execution response promise map:
85 * - `key`: The message id of each submitted task.
86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
91 new Map
<string, PromiseResponseWrapper
<Response
>>()
94 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
107 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
110 * Whether the pool is started or not.
112 private started
: boolean
114 * Whether the pool is starting or not.
116 private starting
: boolean
118 * Whether the pool is destroying or not.
120 private destroying
: boolean
122 * Whether the pool ready event has been emitted or not.
124 private readyEventEmitted
: boolean
126 * The start timestamp of the pool.
128 private readonly startTimestamp
131 * Constructs a new poolifier pool.
133 * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage.
134 * @param filePath - Path to the worker file.
135 * @param opts - Options for the pool.
136 * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage.
139 protected readonly minimumNumberOfWorkers
: number,
140 protected readonly filePath
: string,
141 protected readonly opts
: PoolOptions
<Worker
>,
142 protected readonly maximumNumberOfWorkers
?: number
144 if (!this.isMain()) {
146 'Cannot start a pool from a worker with the same type as the pool'
150 checkFilePath(this.filePath
)
151 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
152 this.checkPoolOptions(this.opts
)
154 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
155 this.executeTask
= this.executeTask
.bind(this)
156 this.enqueueTask
= this.enqueueTask
.bind(this)
158 if (this.opts
.enableEvents
=== true) {
159 this.initializeEventEmitter()
161 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
167 this.opts
.workerChoiceStrategy
,
168 this.opts
.workerChoiceStrategyOptions
173 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
176 this.starting
= false
177 this.destroying
= false
178 this.readyEventEmitted
= false
179 if (this.opts
.startWorkers
=== true) {
183 this.startTimestamp
= performance
.now()
186 private checkPoolType (): void {
187 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
189 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
194 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers
: number): void {
195 if (minimumNumberOfWorkers
== null) {
197 'Cannot instantiate a pool without specifying the number of workers'
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
201 'Cannot instantiate a pool with a non safe integer number of workers'
203 } else if (minimumNumberOfWorkers
< 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
207 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
212 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
213 if (isPlainObject(opts
)) {
214 this.opts
.startWorkers
= opts
.startWorkers
?? true
215 checkValidWorkerChoiceStrategy(
216 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
218 this.opts
.workerChoiceStrategy
=
219 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
220 this.checkValidWorkerChoiceStrategyOptions(
221 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
223 if (opts
.workerChoiceStrategyOptions
!= null) {
224 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
226 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
227 this.opts
.enableEvents
= opts
.enableEvents
?? true
228 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
229 if (this.opts
.enableTasksQueue
) {
230 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
231 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
232 opts
.tasksQueueOptions
as TasksQueueOptions
236 throw new TypeError('Invalid pool options: must be a plain object')
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
244 workerChoiceStrategyOptions
!= null &&
245 !isPlainObject(workerChoiceStrategyOptions
)
248 'Invalid worker choice strategy options: must be a plain object'
252 workerChoiceStrategyOptions
?.weights
!= null &&
253 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
254 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
257 'Invalid worker choice strategy options: must have a weight for each worker node'
261 workerChoiceStrategyOptions
?.measurement
!= null &&
262 !Object.values(Measurements
).includes(
263 workerChoiceStrategyOptions
.measurement
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 private initializeEventEmitter (): void {
273 this.emitter
= new EventEmitterAsyncResource({
274 name
: `poolifier:${this.type}-${this.worker}-pool`
279 public get
info (): PoolInfo
{
284 started
: this.started
,
286 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
287 minSize
: this.minimumNumberOfWorkers
,
288 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
289 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
290 .runTime
.aggregate
&&
291 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
292 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
293 workerNodes
: this.workerNodes
.length
,
294 idleWorkerNodes
: this.workerNodes
.reduce(
295 (accumulator
, workerNode
) =>
296 workerNode
.usage
.tasks
.executing
=== 0
301 busyWorkerNodes
: this.workerNodes
.reduce(
302 (accumulator
, _workerNode
, workerNodeKey
) =>
303 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
306 executedTasks
: this.workerNodes
.reduce(
307 (accumulator
, workerNode
) =>
308 accumulator
+ workerNode
.usage
.tasks
.executed
,
311 executingTasks
: this.workerNodes
.reduce(
312 (accumulator
, workerNode
) =>
313 accumulator
+ workerNode
.usage
.tasks
.executing
,
316 ...(this.opts
.enableTasksQueue
=== true && {
317 queuedTasks
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 accumulator
+ workerNode
.usage
.tasks
.queued
,
323 ...(this.opts
.enableTasksQueue
=== true && {
324 maxQueuedTasks
: this.workerNodes
.reduce(
325 (accumulator
, workerNode
) =>
326 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
330 ...(this.opts
.enableTasksQueue
=== true && {
331 backPressure
: this.hasBackPressure()
333 ...(this.opts
.enableTasksQueue
=== true && {
334 stolenTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ workerNode
.usage
.tasks
.stolen
,
340 failedTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.failed
,
345 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
346 .runTime
.aggregate
&& {
350 ...this.workerNodes
.map(
351 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
357 ...this.workerNodes
.map(
358 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
362 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
363 .runTime
.average
&& {
366 this.workerNodes
.reduce
<number[]>(
367 (accumulator
, workerNode
) =>
368 accumulator
.concat(workerNode
.usage
.runTime
.history
),
374 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
378 this.workerNodes
.reduce
<number[]>(
379 (accumulator
, workerNode
) =>
380 accumulator
.concat(workerNode
.usage
.runTime
.history
),
388 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
389 .waitTime
.aggregate
&& {
393 ...this.workerNodes
.map(
394 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
400 ...this.workerNodes
.map(
401 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
405 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
406 .waitTime
.average
&& {
409 this.workerNodes
.reduce
<number[]>(
410 (accumulator
, workerNode
) =>
411 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
417 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
418 .waitTime
.median
&& {
421 this.workerNodes
.reduce
<number[]>(
422 (accumulator
, workerNode
) =>
423 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
435 * The pool readiness boolean status.
437 private get
ready (): boolean {
439 this.workerNodes
.reduce(
440 (accumulator
, workerNode
) =>
441 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
445 ) >= this.minimumNumberOfWorkers
450 * The approximate pool utilization.
452 * @returns The pool utilization.
454 private get
utilization (): number {
455 const poolTimeCapacity
=
456 (performance
.now() - this.startTimestamp
) *
457 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
458 const totalTasksRunTime
= this.workerNodes
.reduce(
459 (accumulator
, workerNode
) =>
460 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
463 const totalTasksWaitTime
= this.workerNodes
.reduce(
464 (accumulator
, workerNode
) =>
465 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
468 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
474 * If it is `'dynamic'`, it provides the `max` property.
476 protected abstract get
type (): PoolType
481 protected abstract get
worker (): WorkerType
484 * Checks if the worker id sent in the received message from a worker is valid.
486 * @param message - The received message.
487 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
489 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
490 if (message
.workerId
== null) {
491 throw new Error('Worker message received without worker id')
492 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
494 `Worker message received from unknown worker '${message.workerId}'`
500 * Gets the worker node key given its worker id.
502 * @param workerId - The worker id.
503 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
505 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
506 return this.workerNodes
.findIndex(
507 workerNode
=> workerNode
.info
.id
=== workerId
512 public setWorkerChoiceStrategy (
513 workerChoiceStrategy
: WorkerChoiceStrategy
,
514 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
516 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
517 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
518 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
519 this.opts
.workerChoiceStrategy
521 if (workerChoiceStrategyOptions
!= null) {
522 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
524 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
525 workerNode
.resetUsage()
526 this.sendStatisticsMessageToWorker(workerNodeKey
)
531 public setWorkerChoiceStrategyOptions (
532 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
534 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
535 if (workerChoiceStrategyOptions
!= null) {
536 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
538 this.workerChoiceStrategyContext
.setOptions(
540 this.opts
.workerChoiceStrategyOptions
545 public enableTasksQueue (
547 tasksQueueOptions
?: TasksQueueOptions
549 if (this.opts
.enableTasksQueue
=== true && !enable
) {
550 this.unsetTaskStealing()
551 this.unsetTasksStealingOnBackPressure()
552 this.flushTasksQueues()
554 this.opts
.enableTasksQueue
= enable
555 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
559 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
560 if (this.opts
.enableTasksQueue
=== true) {
561 checkValidTasksQueueOptions(tasksQueueOptions
)
562 this.opts
.tasksQueueOptions
=
563 this.buildTasksQueueOptions(tasksQueueOptions
)
564 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
565 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
566 this.unsetTaskStealing()
567 this.setTaskStealing()
569 this.unsetTaskStealing()
571 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
572 this.unsetTasksStealingOnBackPressure()
573 this.setTasksStealingOnBackPressure()
575 this.unsetTasksStealingOnBackPressure()
577 } else if (this.opts
.tasksQueueOptions
!= null) {
578 delete this.opts
.tasksQueueOptions
582 private buildTasksQueueOptions (
583 tasksQueueOptions
: TasksQueueOptions
584 ): TasksQueueOptions
{
586 ...getDefaultTasksQueueOptions(
587 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
593 private setTasksQueueSize (size
: number): void {
594 for (const workerNode
of this.workerNodes
) {
595 workerNode
.tasksQueueBackPressureSize
= size
599 private setTaskStealing (): void {
600 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
601 this.workerNodes
[workerNodeKey
].on(
603 this.handleIdleWorkerNodeEvent
608 private unsetTaskStealing (): void {
609 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
610 this.workerNodes
[workerNodeKey
].off(
612 this.handleIdleWorkerNodeEvent
617 private setTasksStealingOnBackPressure (): void {
618 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
619 this.workerNodes
[workerNodeKey
].on(
621 this.handleBackPressureEvent
626 private unsetTasksStealingOnBackPressure (): void {
627 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
628 this.workerNodes
[workerNodeKey
].off(
630 this.handleBackPressureEvent
636 * Whether the pool is full or not.
638 * The pool filling boolean status.
640 protected get
full (): boolean {
642 this.workerNodes
.length
>=
643 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
648 * Whether the pool is busy or not.
650 * The pool busyness boolean status.
652 protected abstract get
busy (): boolean
655 * Whether worker nodes are executing concurrently their tasks quota or not.
657 * @returns Worker nodes busyness boolean status.
659 protected internalBusy (): boolean {
660 if (this.opts
.enableTasksQueue
=== true) {
662 this.workerNodes
.findIndex(
664 workerNode
.info
.ready
&&
665 workerNode
.usage
.tasks
.executing
<
666 (this.opts
.tasksQueueOptions
?.concurrency
as number)
671 this.workerNodes
.findIndex(
673 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
678 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
679 if (this.opts
.enableTasksQueue
=== true) {
681 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
682 (this.opts
.tasksQueueOptions
?.concurrency
as number)
685 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
688 private async sendTaskFunctionOperationToWorker (
689 workerNodeKey
: number,
690 message
: MessageValue
<Data
>
691 ): Promise
<boolean> {
692 return await new Promise
<boolean>((resolve
, reject
) => {
693 const taskFunctionOperationListener
= (
694 message
: MessageValue
<Response
>
696 this.checkMessageWorkerId(message
)
697 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
699 message
.taskFunctionOperationStatus
!= null &&
700 message
.workerId
=== workerId
702 if (message
.taskFunctionOperationStatus
) {
704 } else if (!message
.taskFunctionOperationStatus
) {
707 `Task function operation '${
708 message.taskFunctionOperation as string
709 }' failed on worker ${message.workerId} with error: '${
710 message.workerError?.message as string
715 this.deregisterWorkerMessageListener(
716 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
717 taskFunctionOperationListener
721 this.registerWorkerMessageListener(
723 taskFunctionOperationListener
725 this.sendToWorker(workerNodeKey
, message
)
729 private async sendTaskFunctionOperationToWorkers (
730 message
: MessageValue
<Data
>
731 ): Promise
<boolean> {
732 return await new Promise
<boolean>((resolve
, reject
) => {
733 const responsesReceived
= new Array<MessageValue
<Response
>>()
734 const taskFunctionOperationsListener
= (
735 message
: MessageValue
<Response
>
737 this.checkMessageWorkerId(message
)
738 if (message
.taskFunctionOperationStatus
!= null) {
739 responsesReceived
.push(message
)
740 if (responsesReceived
.length
=== this.workerNodes
.length
) {
742 responsesReceived
.every(
743 message
=> message
.taskFunctionOperationStatus
=== true
748 responsesReceived
.some(
749 message
=> message
.taskFunctionOperationStatus
=== false
752 const errorResponse
= responsesReceived
.find(
753 response
=> response
.taskFunctionOperationStatus
=== false
757 `Task function operation '${
758 message.taskFunctionOperation as string
759 }' failed on worker ${
760 errorResponse?.workerId as number
762 errorResponse?.workerError?.message as string
767 this.deregisterWorkerMessageListener(
768 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
769 taskFunctionOperationsListener
774 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
775 this.registerWorkerMessageListener(
777 taskFunctionOperationsListener
779 this.sendToWorker(workerNodeKey
, message
)
785 public hasTaskFunction (name
: string): boolean {
786 for (const workerNode
of this.workerNodes
) {
788 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
789 workerNode
.info
.taskFunctionNames
.includes(name
)
798 public async addTaskFunction (
800 fn
: TaskFunction
<Data
, Response
>
801 ): Promise
<boolean> {
802 if (typeof name
!== 'string') {
803 throw new TypeError('name argument must be a string')
805 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
806 throw new TypeError('name argument must not be an empty string')
808 if (typeof fn
!== 'function') {
809 throw new TypeError('fn argument must be a function')
811 const opResult
= await this.sendTaskFunctionOperationToWorkers({
812 taskFunctionOperation
: 'add',
813 taskFunctionName
: name
,
814 taskFunction
: fn
.toString()
816 this.taskFunctions
.set(name
, fn
)
821 public async removeTaskFunction (name
: string): Promise
<boolean> {
822 if (!this.taskFunctions
.has(name
)) {
824 'Cannot remove a task function not handled on the pool side'
827 const opResult
= await this.sendTaskFunctionOperationToWorkers({
828 taskFunctionOperation
: 'remove',
829 taskFunctionName
: name
831 this.deleteTaskFunctionWorkerUsages(name
)
832 this.taskFunctions
.delete(name
)
837 public listTaskFunctionNames (): string[] {
838 for (const workerNode
of this.workerNodes
) {
840 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
841 workerNode
.info
.taskFunctionNames
.length
> 0
843 return workerNode
.info
.taskFunctionNames
850 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
851 return await this.sendTaskFunctionOperationToWorkers({
852 taskFunctionOperation
: 'default',
853 taskFunctionName
: name
857 private deleteTaskFunctionWorkerUsages (name
: string): void {
858 for (const workerNode
of this.workerNodes
) {
859 workerNode
.deleteTaskFunctionWorkerUsage(name
)
863 private shallExecuteTask (workerNodeKey
: number): boolean {
865 this.tasksQueueSize(workerNodeKey
) === 0 &&
866 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
867 (this.opts
.tasksQueueOptions
?.concurrency
as number)
872 public async execute (
875 transferList
?: TransferListItem
[]
876 ): Promise
<Response
> {
877 return await new Promise
<Response
>((resolve
, reject
) => {
879 reject(new Error('Cannot execute a task on not started pool'))
882 if (this.destroying
) {
883 reject(new Error('Cannot execute a task on destroying pool'))
886 if (name
!= null && typeof name
!== 'string') {
887 reject(new TypeError('name argument must be a string'))
892 typeof name
=== 'string' &&
893 name
.trim().length
=== 0
895 reject(new TypeError('name argument must not be an empty string'))
898 if (transferList
!= null && !Array.isArray(transferList
)) {
899 reject(new TypeError('transferList argument must be an array'))
902 const timestamp
= performance
.now()
903 const workerNodeKey
= this.chooseWorkerNode()
904 const task
: Task
<Data
> = {
905 name
: name
?? DEFAULT_TASK_NAME
,
906 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
907 data
: data
?? ({} as Data
),
912 this.promiseResponseMap
.set(task
.taskId
as string, {
916 ...(this.emitter
!= null && {
917 asyncResource
: new AsyncResource('poolifier:task', {
918 triggerAsyncId
: this.emitter
.asyncId
,
919 requireManualDestroy
: true
924 this.opts
.enableTasksQueue
=== false ||
925 (this.opts
.enableTasksQueue
=== true &&
926 this.shallExecuteTask(workerNodeKey
))
928 this.executeTask(workerNodeKey
, task
)
930 this.enqueueTask(workerNodeKey
, task
)
936 public start (): void {
938 throw new Error('Cannot start an already started pool')
941 throw new Error('Cannot start an already starting pool')
943 if (this.destroying
) {
944 throw new Error('Cannot start a destroying pool')
948 this.workerNodes
.reduce(
949 (accumulator
, workerNode
) =>
950 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
952 ) < this.minimumNumberOfWorkers
954 this.createAndSetupWorkerNode()
956 this.starting
= false
961 public async destroy (): Promise
<void> {
963 throw new Error('Cannot destroy an already destroyed pool')
966 throw new Error('Cannot destroy an starting pool')
968 if (this.destroying
) {
969 throw new Error('Cannot destroy an already destroying pool')
971 this.destroying
= true
973 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
974 await this.destroyWorkerNode(workerNodeKey
)
977 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
978 this.emitter
?.emitDestroy()
979 this.emitter
?.removeAllListeners()
980 this.readyEventEmitted
= false
981 this.destroying
= false
985 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
986 await new Promise
<void>((resolve
, reject
) => {
987 if (workerNodeKey
< 0 || workerNodeKey
>= this.workerNodes
.length
) {
988 reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
991 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
992 this.checkMessageWorkerId(message
)
993 if (message
.kill
=== 'success') {
995 } else if (message
.kill
=== 'failure') {
998 `Kill message handling failed on worker ${
999 message.workerId as number
1005 // FIXME: should be registered only once
1006 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1007 this.sendToWorker(workerNodeKey
, { kill
: true })
1012 * Terminates the worker node given its worker node key.
1014 * @param workerNodeKey - The worker node key.
1016 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1017 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1018 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1019 const workerNode
= this.workerNodes
[workerNodeKey
]
1020 await waitWorkerNodeEvents(
1024 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1025 getDefaultTasksQueueOptions(
1026 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1027 ).tasksFinishedTimeout
1029 await this.sendKillMessageToWorker(workerNodeKey
)
1030 await workerNode
.terminate()
1034 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1035 * Can be overridden.
1039 protected setupHook (): void {
1040 /* Intentionally empty */
1044 * Should return whether the worker is the main worker or not.
1046 protected abstract isMain (): boolean
1049 * Hook executed before the worker task execution.
1050 * Can be overridden.
1052 * @param workerNodeKey - The worker node key.
1053 * @param task - The task to execute.
1055 protected beforeTaskExecutionHook (
1056 workerNodeKey
: number,
1059 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1060 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1061 ++workerUsage
.tasks
.executing
1062 updateWaitTimeWorkerUsage(
1063 this.workerChoiceStrategyContext
,
1069 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1070 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1074 const taskFunctionWorkerUsage
= this.workerNodes
[
1076 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1077 ++taskFunctionWorkerUsage
.tasks
.executing
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext
,
1080 taskFunctionWorkerUsage
,
1087 * Hook executed after the worker task execution.
1088 * Can be overridden.
1090 * @param workerNodeKey - The worker node key.
1091 * @param message - The received message.
1093 protected afterTaskExecutionHook (
1094 workerNodeKey
: number,
1095 message
: MessageValue
<Response
>
1097 let needWorkerChoiceStrategyUpdate
= false
1098 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1099 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1100 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1101 updateRunTimeWorkerUsage(
1102 this.workerChoiceStrategyContext
,
1106 updateEluWorkerUsage(
1107 this.workerChoiceStrategyContext
,
1111 needWorkerChoiceStrategyUpdate
= true
1114 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1115 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1116 message
.taskPerformance
?.name
as string
1119 const taskFunctionWorkerUsage
= this.workerNodes
[
1121 ].getTaskFunctionWorkerUsage(
1122 message
.taskPerformance
?.name
as string
1124 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1125 updateRunTimeWorkerUsage(
1126 this.workerChoiceStrategyContext
,
1127 taskFunctionWorkerUsage
,
1130 updateEluWorkerUsage(
1131 this.workerChoiceStrategyContext
,
1132 taskFunctionWorkerUsage
,
1135 needWorkerChoiceStrategyUpdate
= true
1137 if (needWorkerChoiceStrategyUpdate
) {
1138 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1143 * Whether the worker node shall update its task function worker usage or not.
1145 * @param workerNodeKey - The worker node key.
1146 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1148 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1149 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1151 workerInfo
!= null &&
1152 Array.isArray(workerInfo
.taskFunctionNames
) &&
1153 workerInfo
.taskFunctionNames
.length
> 2
1158 * Chooses a worker node for the next task.
1160 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1162 * @returns The chosen worker node key
1164 private chooseWorkerNode (): number {
1165 if (this.shallCreateDynamicWorker()) {
1166 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1168 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1170 return workerNodeKey
1173 return this.workerChoiceStrategyContext
.execute()
1177 * Conditions for dynamic worker creation.
1179 * @returns Whether to create a dynamic worker or not.
1181 private shallCreateDynamicWorker (): boolean {
1182 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1186 * Sends a message to worker given its worker node key.
1188 * @param workerNodeKey - The worker node key.
1189 * @param message - The message.
1190 * @param transferList - The optional array of transferable objects.
1192 protected abstract sendToWorker (
1193 workerNodeKey
: number,
1194 message
: MessageValue
<Data
>,
1195 transferList
?: TransferListItem
[]
1199 * Creates a new, completely set up worker node.
1201 * @returns New, completely set up worker node key.
1203 protected createAndSetupWorkerNode (): number {
1204 const workerNode
= this.createWorkerNode()
1205 workerNode
.registerWorkerEventHandler(
1207 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1209 workerNode
.registerWorkerEventHandler(
1211 this.opts
.messageHandler
?? EMPTY_FUNCTION
1213 workerNode
.registerWorkerEventHandler(
1215 this.opts
.errorHandler
?? EMPTY_FUNCTION
1217 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1218 workerNode
.info
.ready
= false
1219 this.emitter
?.emit(PoolEvents
.error
, error
)
1224 this.opts
.restartWorkerOnError
=== true
1226 if (workerNode
.info
.dynamic
) {
1227 this.createAndSetupDynamicWorkerNode()
1229 this.createAndSetupWorkerNode()
1232 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1233 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1235 workerNode
?.terminate().catch(error
=> {
1236 this.emitter
?.emit(PoolEvents
.error
, error
)
1239 workerNode
.registerWorkerEventHandler(
1241 this.opts
.exitHandler
?? EMPTY_FUNCTION
1243 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1244 this.removeWorkerNode(workerNode
)
1246 const workerNodeKey
= this.addWorkerNode(workerNode
)
1247 this.afterWorkerNodeSetup(workerNodeKey
)
1248 return workerNodeKey
1252 * Creates a new, completely set up dynamic worker node.
1254 * @returns New, completely set up dynamic worker node key.
1256 protected createAndSetupDynamicWorkerNode (): number {
1257 const workerNodeKey
= this.createAndSetupWorkerNode()
1258 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1259 this.checkMessageWorkerId(message
)
1260 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1263 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1264 // Kill message received from worker
1266 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1267 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1268 ((this.opts
.enableTasksQueue
=== false &&
1269 workerUsage
.tasks
.executing
=== 0) ||
1270 (this.opts
.enableTasksQueue
=== true &&
1271 workerUsage
.tasks
.executing
=== 0 &&
1272 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1274 // Flag the worker node as not ready immediately
1275 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1276 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1277 this.emitter
?.emit(PoolEvents
.error
, error
)
1281 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1282 this.sendToWorker(workerNodeKey
, {
1285 if (this.taskFunctions
.size
> 0) {
1286 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1287 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1288 taskFunctionOperation
: 'add',
1290 taskFunction
: taskFunction
.toString()
1292 this.emitter
?.emit(PoolEvents
.error
, error
)
1296 workerInfo
.dynamic
= true
1298 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1299 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1301 workerInfo
.ready
= true
1303 this.checkAndEmitDynamicWorkerCreationEvents()
1304 return workerNodeKey
1308 * Registers a listener callback on the worker given its worker node key.
1310 * @param workerNodeKey - The worker node key.
1311 * @param listener - The message listener callback.
1313 protected abstract registerWorkerMessageListener
<
1314 Message
extends Data
| Response
1316 workerNodeKey
: number,
1317 listener
: (message
: MessageValue
<Message
>) => void
1321 * Registers once a listener callback on the worker given its worker node key.
1323 * @param workerNodeKey - The worker node key.
1324 * @param listener - The message listener callback.
1326 protected abstract registerOnceWorkerMessageListener
<
1327 Message
extends Data
| Response
1329 workerNodeKey
: number,
1330 listener
: (message
: MessageValue
<Message
>) => void
1334 * Deregisters a listener callback on the worker given its worker node key.
1336 * @param workerNodeKey - The worker node key.
1337 * @param listener - The message listener callback.
1339 protected abstract deregisterWorkerMessageListener
<
1340 Message
extends Data
| Response
1342 workerNodeKey
: number,
1343 listener
: (message
: MessageValue
<Message
>) => void
1347 * Method hooked up after a worker node has been newly created.
1348 * Can be overridden.
1350 * @param workerNodeKey - The newly created worker node key.
1352 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1353 // Listen to worker messages.
1354 this.registerWorkerMessageListener(
1356 this.workerMessageListener
1358 // Send the startup message to worker.
1359 this.sendStartupMessageToWorker(workerNodeKey
)
1360 // Send the statistics message to worker.
1361 this.sendStatisticsMessageToWorker(workerNodeKey
)
1362 if (this.opts
.enableTasksQueue
=== true) {
1363 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1364 this.workerNodes
[workerNodeKey
].on(
1366 this.handleIdleWorkerNodeEvent
1369 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1370 this.workerNodes
[workerNodeKey
].on(
1372 this.handleBackPressureEvent
1379 * Sends the startup message to worker given its worker node key.
1381 * @param workerNodeKey - The worker node key.
1383 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1386 * Sends the statistics message to worker given its worker node key.
1388 * @param workerNodeKey - The worker node key.
1390 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1391 this.sendToWorker(workerNodeKey
, {
1394 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1396 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1402 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1403 if (this.shallExecuteTask(workerNodeKey
)) {
1404 this.executeTask(workerNodeKey
, task
)
1406 this.enqueueTask(workerNodeKey
, task
)
1410 private redistributeQueuedTasks (workerNodeKey
: number): void {
1411 if (workerNodeKey
=== -1) {
1414 if (this.workerNodes
.length
<= 1) {
1417 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1418 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1419 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1420 return workerNode
.info
.ready
&&
1421 workerNode
.usage
.tasks
.queued
<
1422 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1429 destinationWorkerNodeKey
,
1430 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1435 private updateTaskStolenStatisticsWorkerUsage (
1436 workerNodeKey
: number,
1439 const workerNode
= this.workerNodes
[workerNodeKey
]
1440 if (workerNode
?.usage
!= null) {
1441 ++workerNode
.usage
.tasks
.stolen
1444 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1445 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1447 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1450 ++taskFunctionWorkerUsage
.tasks
.stolen
1454 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1455 workerNodeKey
: number
1457 const workerNode
= this.workerNodes
[workerNodeKey
]
1458 if (workerNode
?.usage
!= null) {
1459 ++workerNode
.usage
.tasks
.sequentiallyStolen
1463 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1464 workerNodeKey
: number,
1467 const workerNode
= this.workerNodes
[workerNodeKey
]
1469 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1470 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1472 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1475 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1479 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1480 workerNodeKey
: number
1482 const workerNode
= this.workerNodes
[workerNodeKey
]
1483 if (workerNode
?.usage
!= null) {
1484 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1488 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1489 workerNodeKey
: number,
1492 const workerNode
= this.workerNodes
[workerNodeKey
]
1494 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1495 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1497 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1500 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1504 private readonly handleIdleWorkerNodeEvent
= (
1505 eventDetail
: WorkerNodeEventDetail
,
1506 previousStolenTask
?: Task
<Data
>
1508 if (this.workerNodes
.length
<= 1) {
1511 const { workerNodeKey
} = eventDetail
1512 if (workerNodeKey
== null) {
1514 'WorkerNode event detail workerNodeKey attribute must be defined'
1517 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1519 previousStolenTask
!= null &&
1520 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1521 (workerNodeTasksUsage
.executing
> 0 ||
1522 this.tasksQueueSize(workerNodeKey
) > 0)
1524 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1525 .taskFunctionNames
as string[]) {
1526 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1531 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1534 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1536 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1539 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1541 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1542 ?.tasks
as TaskStatistics
1544 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1545 (previousStolenTask
!= null &&
1546 previousStolenTask
.name
=== stolenTask
.name
&&
1547 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1549 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1551 stolenTask
.name
as string
1554 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1556 stolenTask
.name
as string
1560 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1562 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1565 .catch(EMPTY_FUNCTION
)
1568 private readonly workerNodeStealTask
= (
1569 workerNodeKey
: number
1570 ): Task
<Data
> | undefined => {
1571 const workerNodes
= this.workerNodes
1574 (workerNodeA
, workerNodeB
) =>
1575 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1577 const sourceWorkerNode
= workerNodes
.find(
1578 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1579 sourceWorkerNode
.info
.ready
&&
1580 sourceWorkerNodeKey
!== workerNodeKey
&&
1581 sourceWorkerNode
.usage
.tasks
.queued
> 0
1583 if (sourceWorkerNode
!= null) {
1584 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1585 this.handleTask(workerNodeKey
, task
)
1586 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1587 this.updateTaskStolenStatisticsWorkerUsage(
1595 private readonly handleBackPressureEvent
= (
1596 eventDetail
: WorkerNodeEventDetail
1598 if (this.workerNodes
.length
<= 1) {
1601 const { workerId
} = eventDetail
1602 const sizeOffset
= 1
1603 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1606 const sourceWorkerNode
=
1607 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1608 const workerNodes
= this.workerNodes
1611 (workerNodeA
, workerNodeB
) =>
1612 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1614 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1616 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1617 workerNode
.info
.ready
&&
1618 workerNode
.info
.id
!== workerId
&&
1619 workerNode
.usage
.tasks
.queued
<
1620 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1622 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1623 this.handleTask(workerNodeKey
, task
)
1624 this.updateTaskStolenStatisticsWorkerUsage(
1633 * This method is the message listener registered on each worker.
1635 protected readonly workerMessageListener
= (
1636 message
: MessageValue
<Response
>
1638 this.checkMessageWorkerId(message
)
1639 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1640 if (ready
!= null && taskFunctionNames
!= null) {
1641 // Worker ready response received from worker
1642 this.handleWorkerReadyResponse(message
)
1643 } else if (taskId
!= null) {
1644 // Task execution response received from worker
1645 this.handleTaskExecutionResponse(message
)
1646 } else if (taskFunctionNames
!= null) {
1647 // Task function names message received from worker
1649 this.getWorkerNodeKeyByWorkerId(workerId
)
1650 ).taskFunctionNames
= taskFunctionNames
1654 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1655 const { workerId
, ready
, taskFunctionNames
} = message
1656 if (ready
=== false) {
1657 throw new Error(`Worker ${workerId as number} failed to initialize`)
1659 const workerInfo
= this.getWorkerInfo(
1660 this.getWorkerNodeKeyByWorkerId(workerId
)
1662 workerInfo
.ready
= ready
as boolean
1663 workerInfo
.taskFunctionNames
= taskFunctionNames
1664 if (!this.readyEventEmitted
&& this.ready
) {
1665 this.readyEventEmitted
= true
1666 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1670 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1671 const { workerId
, taskId
, workerError
, data
} = message
1672 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1673 if (promiseResponse
!= null) {
1674 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1675 const workerNode
= this.workerNodes
[workerNodeKey
]
1676 if (workerError
!= null) {
1677 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1678 asyncResource
!= null
1679 ? asyncResource
.runInAsyncScope(
1684 : reject(workerError
.message
)
1686 asyncResource
!= null
1687 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1688 : resolve(data
as Response
)
1690 asyncResource
?.emitDestroy()
1691 this.afterTaskExecutionHook(workerNodeKey
, message
)
1692 this.promiseResponseMap
.delete(taskId
as string)
1693 workerNode
?.emit('taskFinished', taskId
)
1694 if (this.opts
.enableTasksQueue
=== true && !this.destroying
) {
1695 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1697 this.tasksQueueSize(workerNodeKey
) > 0 &&
1698 workerNodeTasksUsage
.executing
<
1699 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1703 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1707 workerNodeTasksUsage
.executing
=== 0 &&
1708 this.tasksQueueSize(workerNodeKey
) === 0 &&
1709 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1711 workerNode
.emit('idleWorkerNode', {
1712 workerId
: workerId
as number,
1720 private checkAndEmitTaskExecutionEvents (): void {
1722 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1726 private checkAndEmitTaskQueuingEvents (): void {
1727 if (this.hasBackPressure()) {
1728 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1732 private checkAndEmitDynamicWorkerCreationEvents (): void {
1733 if (this.type === PoolTypes
.dynamic
) {
1735 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1741 * Gets the worker information given its worker node key.
1743 * @param workerNodeKey - The worker node key.
1744 * @returns The worker information.
1746 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1747 return this.workerNodes
[workerNodeKey
]?.info
1751 * Creates a worker node.
1753 * @returns The created worker node.
1755 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1756 const workerNode
= new WorkerNode
<Worker
, Data
>(
1761 workerOptions
: this.opts
.workerOptions
,
1762 tasksQueueBackPressureSize
:
1763 this.opts
.tasksQueueOptions
?.size
??
1764 getDefaultTasksQueueOptions(
1765 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1769 // Flag the worker node as ready at pool startup.
1770 if (this.starting
) {
1771 workerNode
.info
.ready
= true
1777 * Adds the given worker node in the pool worker nodes.
1779 * @param workerNode - The worker node.
1780 * @returns The added worker node key.
1781 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1783 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1784 this.workerNodes
.push(workerNode
)
1785 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1786 if (workerNodeKey
=== -1) {
1787 throw new Error('Worker added not found in worker nodes')
1789 return workerNodeKey
1793 * Removes the worker node from the pool worker nodes.
1795 * @param workerNode - The worker node.
1797 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1798 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1799 if (workerNodeKey
!== -1) {
1800 this.workerNodes
.splice(workerNodeKey
, 1)
1801 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1805 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1806 this.getWorkerInfo(workerNodeKey
).ready
= false
1810 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1812 this.opts
.enableTasksQueue
=== true &&
1813 this.workerNodes
[workerNodeKey
].hasBackPressure()
1817 private hasBackPressure (): boolean {
1819 this.opts
.enableTasksQueue
=== true &&
1820 this.workerNodes
.findIndex(
1821 workerNode
=> !workerNode
.hasBackPressure()
1827 * Executes the given task on the worker given its worker node key.
1829 * @param workerNodeKey - The worker node key.
1830 * @param task - The task to execute.
1832 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1833 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1834 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1835 this.checkAndEmitTaskExecutionEvents()
1838 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1839 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1840 this.checkAndEmitTaskQueuingEvents()
1841 return tasksQueueSize
1844 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1845 return this.workerNodes
[workerNodeKey
].dequeueTask()
1848 private tasksQueueSize (workerNodeKey
: number): number {
1849 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1852 protected flushTasksQueue (workerNodeKey
: number): number {
1853 let flushedTasks
= 0
1854 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1857 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1861 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1867 this.flushTasksQueue(workerNodeKey
)