1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types'
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
25 import { KillBehaviors
} from
'../worker/worker-options'
26 import type { TaskFunction
} from
'../worker/task-functions'
34 type TasksQueueOptions
41 WorkerNodeEventDetail
,
47 WorkerChoiceStrategies
,
48 type WorkerChoiceStrategy
,
49 type WorkerChoiceStrategyOptions
50 } from
'./selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
52 import { version
} from
'./version'
53 import { WorkerNode
} from
'./worker-node'
56 checkValidTasksQueueOptions
,
57 checkValidWorkerChoiceStrategy
,
58 getDefaultTasksQueueOptions
,
60 updateRunTimeWorkerUsage
,
61 updateTaskStatisticsWorkerUsage
,
62 updateWaitTimeWorkerUsage
,
67 * Base class that implements some shared logic for all poolifier pools.
69 * @typeParam Worker - Type of worker which manages this pool.
70 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
71 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
73 export abstract class AbstractPool
<
74 Worker
extends IWorker
,
77 > implements IPool
<Worker
, Data
, Response
> {
79 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
82 public emitter
?: EventEmitterAsyncResource
85 * Dynamic pool maximum size property placeholder.
87 protected readonly max
?: number
90 * The task execution response promise map:
91 * - `key`: The message id of each submitted task.
92 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
94 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
96 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
97 new Map
<string, PromiseResponseWrapper
<Response
>>()
100 * Worker choice strategy context referencing a worker choice algorithm implementation.
102 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
109 * The task functions added at runtime map:
110 * - `key`: The task function name.
111 * - `value`: The task function itself.
113 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
116 * Whether the pool is started or not.
118 private started
: boolean
120 * Whether the pool is starting or not.
122 private starting
: boolean
124 * Whether the pool is destroying or not.
126 private destroying
: boolean
128 * Whether the pool ready event has been emitted or not.
130 private readyEventEmitted
: boolean
132 * The start timestamp of the pool.
134 private readonly startTimestamp
137 * Constructs a new poolifier pool.
139 * @param numberOfWorkers - Number of workers that this pool should manage.
140 * @param filePath - Path to the worker file.
141 * @param opts - Options for the pool.
144 protected readonly numberOfWorkers
: number,
145 protected readonly filePath
: string,
146 protected readonly opts
: PoolOptions
<Worker
>
148 if (!this.isMain()) {
150 'Cannot start a pool from a worker with the same type as the pool'
153 checkFilePath(this.filePath
)
154 this.checkNumberOfWorkers(this.numberOfWorkers
)
155 this.checkPoolOptions(this.opts
)
157 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
158 this.executeTask
= this.executeTask
.bind(this)
159 this.enqueueTask
= this.enqueueTask
.bind(this)
161 if (this.opts
.enableEvents
=== true) {
162 this.initializeEventEmitter()
164 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
170 this.opts
.workerChoiceStrategy
,
171 this.opts
.workerChoiceStrategyOptions
176 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
179 this.starting
= false
180 this.destroying
= false
181 this.readyEventEmitted
= false
182 if (this.opts
.startWorkers
=== true) {
186 this.startTimestamp
= performance
.now()
189 private checkNumberOfWorkers (numberOfWorkers
: number): void {
190 if (numberOfWorkers
== null) {
192 'Cannot instantiate a pool without specifying the number of workers'
194 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
196 'Cannot instantiate a pool with a non safe integer number of workers'
198 } else if (numberOfWorkers
< 0) {
199 throw new RangeError(
200 'Cannot instantiate a pool with a negative number of workers'
202 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
203 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
207 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
208 if (isPlainObject(opts
)) {
209 this.opts
.startWorkers
= opts
.startWorkers
?? true
210 checkValidWorkerChoiceStrategy(
211 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
213 this.opts
.workerChoiceStrategy
=
214 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
215 this.checkValidWorkerChoiceStrategyOptions(
216 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
218 this.opts
.workerChoiceStrategyOptions
= {
219 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
220 ...opts
.workerChoiceStrategyOptions
222 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
223 this.opts
.enableEvents
= opts
.enableEvents
?? true
224 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
225 if (this.opts
.enableTasksQueue
) {
226 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
227 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
228 opts
.tasksQueueOptions
as TasksQueueOptions
232 throw new TypeError('Invalid pool options: must be a plain object')
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
240 workerChoiceStrategyOptions
!= null &&
241 !isPlainObject(workerChoiceStrategyOptions
)
244 'Invalid worker choice strategy options: must be a plain object'
248 workerChoiceStrategyOptions
?.retries
!= null &&
249 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
252 'Invalid worker choice strategy options: retries must be an integer'
256 workerChoiceStrategyOptions
?.retries
!= null &&
257 workerChoiceStrategyOptions
.retries
< 0
259 throw new RangeError(
260 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
264 workerChoiceStrategyOptions
?.weights
!= null &&
265 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
268 'Invalid worker choice strategy options: must have a weight for each worker node'
272 workerChoiceStrategyOptions
?.measurement
!= null &&
273 !Object.values(Measurements
).includes(
274 workerChoiceStrategyOptions
.measurement
278 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 private initializeEventEmitter (): void {
284 this.emitter
= new EventEmitterAsyncResource({
285 name
: `poolifier:${this.type}-${this.worker}-pool`
290 public get
info (): PoolInfo
{
295 started
: this.started
,
297 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
298 minSize
: this.minSize
,
299 maxSize
: this.maxSize
,
300 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
301 .runTime
.aggregate
&&
302 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
303 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
304 workerNodes
: this.workerNodes
.length
,
305 idleWorkerNodes
: this.workerNodes
.reduce(
306 (accumulator
, workerNode
) =>
307 workerNode
.usage
.tasks
.executing
=== 0
312 busyWorkerNodes
: this.workerNodes
.reduce(
313 (accumulator
, _workerNode
, workerNodeKey
) =>
314 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
317 executedTasks
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 accumulator
+ workerNode
.usage
.tasks
.executed
,
322 executingTasks
: this.workerNodes
.reduce(
323 (accumulator
, workerNode
) =>
324 accumulator
+ workerNode
.usage
.tasks
.executing
,
327 ...(this.opts
.enableTasksQueue
=== true && {
328 queuedTasks
: this.workerNodes
.reduce(
329 (accumulator
, workerNode
) =>
330 accumulator
+ workerNode
.usage
.tasks
.queued
,
334 ...(this.opts
.enableTasksQueue
=== true && {
335 maxQueuedTasks
: this.workerNodes
.reduce(
336 (accumulator
, workerNode
) =>
337 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
341 ...(this.opts
.enableTasksQueue
=== true && {
342 backPressure
: this.hasBackPressure()
344 ...(this.opts
.enableTasksQueue
=== true && {
345 stolenTasks
: this.workerNodes
.reduce(
346 (accumulator
, workerNode
) =>
347 accumulator
+ workerNode
.usage
.tasks
.stolen
,
351 failedTasks
: this.workerNodes
.reduce(
352 (accumulator
, workerNode
) =>
353 accumulator
+ workerNode
.usage
.tasks
.failed
,
356 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
357 .runTime
.aggregate
&& {
361 ...this.workerNodes
.map(
362 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
368 ...this.workerNodes
.map(
369 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
373 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
374 .runTime
.average
&& {
377 this.workerNodes
.reduce
<number[]>(
378 (accumulator
, workerNode
) =>
379 accumulator
.concat(workerNode
.usage
.runTime
.history
),
385 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
389 this.workerNodes
.reduce
<number[]>(
390 (accumulator
, workerNode
) =>
391 accumulator
.concat(workerNode
.usage
.runTime
.history
),
399 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
400 .waitTime
.aggregate
&& {
404 ...this.workerNodes
.map(
405 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
411 ...this.workerNodes
.map(
412 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
416 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
417 .waitTime
.average
&& {
420 this.workerNodes
.reduce
<number[]>(
421 (accumulator
, workerNode
) =>
422 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
428 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
429 .waitTime
.median
&& {
432 this.workerNodes
.reduce
<number[]>(
433 (accumulator
, workerNode
) =>
434 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
446 * The pool readiness boolean status.
448 private get
ready (): boolean {
450 this.workerNodes
.reduce(
451 (accumulator
, workerNode
) =>
452 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
461 * The approximate pool utilization.
463 * @returns The pool utilization.
465 private get
utilization (): number {
466 const poolTimeCapacity
=
467 (performance
.now() - this.startTimestamp
) * this.maxSize
468 const totalTasksRunTime
= this.workerNodes
.reduce(
469 (accumulator
, workerNode
) =>
470 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
473 const totalTasksWaitTime
= this.workerNodes
.reduce(
474 (accumulator
, workerNode
) =>
475 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
478 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
484 * If it is `'dynamic'`, it provides the `max` property.
486 protected abstract get
type (): PoolType
491 protected abstract get
worker (): WorkerType
494 * The pool minimum size.
496 protected get
minSize (): number {
497 return this.numberOfWorkers
501 * The pool maximum size.
503 protected get
maxSize (): number {
504 return this.max
?? this.numberOfWorkers
508 * Checks if the worker id sent in the received message from a worker is valid.
510 * @param message - The received message.
511 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
513 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
514 if (message
.workerId
== null) {
515 throw new Error('Worker message received without worker id')
516 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
518 `Worker message received from unknown worker '${message.workerId}'`
524 * Gets the worker node key given its worker id.
526 * @param workerId - The worker id.
527 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
529 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
530 return this.workerNodes
.findIndex(
531 workerNode
=> workerNode
.info
.id
=== workerId
536 public setWorkerChoiceStrategy (
537 workerChoiceStrategy
: WorkerChoiceStrategy
,
538 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
540 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
541 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
542 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
543 this.opts
.workerChoiceStrategy
545 if (workerChoiceStrategyOptions
!= null) {
546 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
548 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
549 workerNode
.resetUsage()
550 this.sendStatisticsMessageToWorker(workerNodeKey
)
555 public setWorkerChoiceStrategyOptions (
556 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
558 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
559 this.opts
.workerChoiceStrategyOptions
= {
560 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
561 ...workerChoiceStrategyOptions
563 this.workerChoiceStrategyContext
.setOptions(
564 this.opts
.workerChoiceStrategyOptions
569 public enableTasksQueue (
571 tasksQueueOptions
?: TasksQueueOptions
573 if (this.opts
.enableTasksQueue
=== true && !enable
) {
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
576 this.flushTasksQueues()
578 this.opts
.enableTasksQueue
= enable
579 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
583 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
584 if (this.opts
.enableTasksQueue
=== true) {
585 checkValidTasksQueueOptions(tasksQueueOptions
)
586 this.opts
.tasksQueueOptions
=
587 this.buildTasksQueueOptions(tasksQueueOptions
)
588 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
589 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
590 this.unsetTaskStealing()
591 this.setTaskStealing()
593 this.unsetTaskStealing()
595 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
596 this.unsetTasksStealingOnBackPressure()
597 this.setTasksStealingOnBackPressure()
599 this.unsetTasksStealingOnBackPressure()
601 } else if (this.opts
.tasksQueueOptions
!= null) {
602 delete this.opts
.tasksQueueOptions
606 private buildTasksQueueOptions (
607 tasksQueueOptions
: TasksQueueOptions
608 ): TasksQueueOptions
{
610 ...getDefaultTasksQueueOptions(this.maxSize
),
615 private setTasksQueueSize (size
: number): void {
616 for (const workerNode
of this.workerNodes
) {
617 workerNode
.tasksQueueBackPressureSize
= size
621 private setTaskStealing (): void {
622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
623 this.workerNodes
[workerNodeKey
].on(
625 this.handleIdleWorkerNodeEvent
630 private unsetTaskStealing (): void {
631 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
632 this.workerNodes
[workerNodeKey
].off(
634 this.handleIdleWorkerNodeEvent
639 private setTasksStealingOnBackPressure (): void {
640 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
641 this.workerNodes
[workerNodeKey
].on(
643 this.handleBackPressureEvent
648 private unsetTasksStealingOnBackPressure (): void {
649 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
650 this.workerNodes
[workerNodeKey
].off(
652 this.handleBackPressureEvent
658 * Whether the pool is full or not.
660 * The pool filling boolean status.
662 protected get
full (): boolean {
663 return this.workerNodes
.length
>= this.maxSize
667 * Whether the pool is busy or not.
669 * The pool busyness boolean status.
671 protected abstract get
busy (): boolean
674 * Whether worker nodes are executing concurrently their tasks quota or not.
676 * @returns Worker nodes busyness boolean status.
678 protected internalBusy (): boolean {
679 if (this.opts
.enableTasksQueue
=== true) {
681 this.workerNodes
.findIndex(
683 workerNode
.info
.ready
&&
684 workerNode
.usage
.tasks
.executing
<
685 (this.opts
.tasksQueueOptions
?.concurrency
as number)
690 this.workerNodes
.findIndex(
692 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
697 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
698 if (this.opts
.enableTasksQueue
=== true) {
700 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
701 (this.opts
.tasksQueueOptions
?.concurrency
as number)
704 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey
: number,
709 message
: MessageValue
<Data
>
710 ): Promise
<boolean> {
711 return await new Promise
<boolean>((resolve
, reject
) => {
712 const taskFunctionOperationListener
= (
713 message
: MessageValue
<Response
>
715 this.checkMessageWorkerId(message
)
716 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
718 message
.taskFunctionOperationStatus
!= null &&
719 message
.workerId
=== workerId
721 if (message
.taskFunctionOperationStatus
) {
723 } else if (!message
.taskFunctionOperationStatus
) {
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
736 taskFunctionOperationListener
740 this.registerWorkerMessageListener(
742 taskFunctionOperationListener
744 this.sendToWorker(workerNodeKey
, message
)
748 private async sendTaskFunctionOperationToWorkers (
749 message
: MessageValue
<Data
>
750 ): Promise
<boolean> {
751 return await new Promise
<boolean>((resolve
, reject
) => {
752 const responsesReceived
= new Array<MessageValue
<Response
>>()
753 const taskFunctionOperationsListener
= (
754 message
: MessageValue
<Response
>
756 this.checkMessageWorkerId(message
)
757 if (message
.taskFunctionOperationStatus
!= null) {
758 responsesReceived
.push(message
)
759 if (responsesReceived
.length
=== this.workerNodes
.length
) {
761 responsesReceived
.every(
762 message
=> message
.taskFunctionOperationStatus
=== true
767 responsesReceived
.some(
768 message
=> message
.taskFunctionOperationStatus
=== false
771 const errorResponse
= responsesReceived
.find(
772 response
=> response
.taskFunctionOperationStatus
=== false
776 `Task function operation '${
777 message.taskFunctionOperation as string
778 }' failed on worker ${
779 errorResponse?.workerId as number
781 errorResponse?.workerError?.message as string
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
788 taskFunctionOperationsListener
793 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
794 this.registerWorkerMessageListener(
796 taskFunctionOperationsListener
798 this.sendToWorker(workerNodeKey
, message
)
804 public hasTaskFunction (name
: string): boolean {
805 for (const workerNode
of this.workerNodes
) {
807 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
808 workerNode
.info
.taskFunctionNames
.includes(name
)
817 public async addTaskFunction (
819 fn
: TaskFunction
<Data
, Response
>
820 ): Promise
<boolean> {
821 if (typeof name
!== 'string') {
822 throw new TypeError('name argument must be a string')
824 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
825 throw new TypeError('name argument must not be an empty string')
827 if (typeof fn
!== 'function') {
828 throw new TypeError('fn argument must be a function')
830 const opResult
= await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation
: 'add',
832 taskFunctionName
: name
,
833 taskFunction
: fn
.toString()
835 this.taskFunctions
.set(name
, fn
)
840 public async removeTaskFunction (name
: string): Promise
<boolean> {
841 if (!this.taskFunctions
.has(name
)) {
843 'Cannot remove a task function not handled on the pool side'
846 const opResult
= await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation
: 'remove',
848 taskFunctionName
: name
850 this.deleteTaskFunctionWorkerUsages(name
)
851 this.taskFunctions
.delete(name
)
856 public listTaskFunctionNames (): string[] {
857 for (const workerNode
of this.workerNodes
) {
859 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
860 workerNode
.info
.taskFunctionNames
.length
> 0
862 return workerNode
.info
.taskFunctionNames
869 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
870 return await this.sendTaskFunctionOperationToWorkers({
871 taskFunctionOperation
: 'default',
872 taskFunctionName
: name
876 private deleteTaskFunctionWorkerUsages (name
: string): void {
877 for (const workerNode
of this.workerNodes
) {
878 workerNode
.deleteTaskFunctionWorkerUsage(name
)
882 private shallExecuteTask (workerNodeKey
: number): boolean {
884 this.tasksQueueSize(workerNodeKey
) === 0 &&
885 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
886 (this.opts
.tasksQueueOptions
?.concurrency
as number)
891 public async execute (
894 transferList
?: TransferListItem
[]
895 ): Promise
<Response
> {
896 return await new Promise
<Response
>((resolve
, reject
) => {
898 reject(new Error('Cannot execute a task on not started pool'))
901 if (this.destroying
) {
902 reject(new Error('Cannot execute a task on destroying pool'))
905 if (name
!= null && typeof name
!== 'string') {
906 reject(new TypeError('name argument must be a string'))
911 typeof name
=== 'string' &&
912 name
.trim().length
=== 0
914 reject(new TypeError('name argument must not be an empty string'))
917 if (transferList
!= null && !Array.isArray(transferList
)) {
918 reject(new TypeError('transferList argument must be an array'))
921 const timestamp
= performance
.now()
922 const workerNodeKey
= this.chooseWorkerNode()
923 const task
: Task
<Data
> = {
924 name
: name
?? DEFAULT_TASK_NAME
,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data
: data
?? ({} as Data
),
931 this.promiseResponseMap
.set(task
.taskId
as string, {
935 ...(this.emitter
!= null && {
936 asyncResource
: new AsyncResource('poolifier:task', {
937 triggerAsyncId
: this.emitter
.asyncId
,
938 requireManualDestroy
: true
943 this.opts
.enableTasksQueue
=== false ||
944 (this.opts
.enableTasksQueue
=== true &&
945 this.shallExecuteTask(workerNodeKey
))
947 this.executeTask(workerNodeKey
, task
)
949 this.enqueueTask(workerNodeKey
, task
)
955 public start (): void {
957 throw new Error('Cannot start an already started pool')
960 throw new Error('Cannot start an already starting pool')
962 if (this.destroying
) {
963 throw new Error('Cannot start a destroying pool')
967 this.workerNodes
.reduce(
968 (accumulator
, workerNode
) =>
969 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
971 ) < this.numberOfWorkers
973 this.createAndSetupWorkerNode()
975 this.starting
= false
980 public async destroy (): Promise
<void> {
982 throw new Error('Cannot destroy an already destroyed pool')
985 throw new Error('Cannot destroy an starting pool')
987 if (this.destroying
) {
988 throw new Error('Cannot destroy an already destroying pool')
990 this.destroying
= true
992 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
993 await this.destroyWorkerNode(workerNodeKey
)
996 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
997 this.emitter
?.emitDestroy()
998 this.emitter
?.removeAllListeners()
999 this.readyEventEmitted
= false
1000 this.destroying
= false
1001 this.started
= false
1004 protected async sendKillMessageToWorker (
1005 workerNodeKey
: number
1007 await new Promise
<void>((resolve
, reject
) => {
1008 if (workerNodeKey
< 0 || workerNodeKey
>= this.workerNodes
.length
) {
1009 reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
1012 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1013 this.checkMessageWorkerId(message
)
1014 if (message
.kill
=== 'success') {
1016 } else if (message
.kill
=== 'failure') {
1019 `Kill message handling failed on worker ${
1020 message.workerId as number
1026 // FIXME: should be registered only once
1027 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1028 this.sendToWorker(workerNodeKey
, { kill
: true })
1033 * Terminates the worker node given its worker node key.
1035 * @param workerNodeKey - The worker node key.
1037 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1038 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1039 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1040 const workerNode
= this.workerNodes
[workerNodeKey
]
1041 await waitWorkerNodeEvents(
1045 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1046 getDefaultTasksQueueOptions(this.maxSize
).tasksFinishedTimeout
1048 await this.sendKillMessageToWorker(workerNodeKey
)
1049 await workerNode
.terminate()
1053 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1054 * Can be overridden.
1058 protected setupHook (): void {
1059 /* Intentionally empty */
1063 * Should return whether the worker is the main worker or not.
1065 protected abstract isMain (): boolean
1068 * Hook executed before the worker task execution.
1069 * Can be overridden.
1071 * @param workerNodeKey - The worker node key.
1072 * @param task - The task to execute.
1074 protected beforeTaskExecutionHook (
1075 workerNodeKey
: number,
1078 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1079 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1080 ++workerUsage
.tasks
.executing
1081 updateWaitTimeWorkerUsage(
1082 this.workerChoiceStrategyContext
,
1088 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1089 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1093 const taskFunctionWorkerUsage
= this.workerNodes
[
1095 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1096 ++taskFunctionWorkerUsage
.tasks
.executing
1097 updateWaitTimeWorkerUsage(
1098 this.workerChoiceStrategyContext
,
1099 taskFunctionWorkerUsage
,
1106 * Hook executed after the worker task execution.
1107 * Can be overridden.
1109 * @param workerNodeKey - The worker node key.
1110 * @param message - The received message.
1112 protected afterTaskExecutionHook (
1113 workerNodeKey
: number,
1114 message
: MessageValue
<Response
>
1116 let needWorkerChoiceStrategyUpdate
= false
1117 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1118 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1119 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1120 updateRunTimeWorkerUsage(
1121 this.workerChoiceStrategyContext
,
1125 updateEluWorkerUsage(
1126 this.workerChoiceStrategyContext
,
1130 needWorkerChoiceStrategyUpdate
= true
1133 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1134 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1135 message
.taskPerformance
?.name
as string
1138 const taskFunctionWorkerUsage
= this.workerNodes
[
1140 ].getTaskFunctionWorkerUsage(
1141 message
.taskPerformance
?.name
as string
1143 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1144 updateRunTimeWorkerUsage(
1145 this.workerChoiceStrategyContext
,
1146 taskFunctionWorkerUsage
,
1149 updateEluWorkerUsage(
1150 this.workerChoiceStrategyContext
,
1151 taskFunctionWorkerUsage
,
1154 needWorkerChoiceStrategyUpdate
= true
1156 if (needWorkerChoiceStrategyUpdate
) {
1157 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1162 * Whether the worker node shall update its task function worker usage or not.
1164 * @param workerNodeKey - The worker node key.
1165 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1167 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1168 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1170 workerInfo
!= null &&
1171 Array.isArray(workerInfo
.taskFunctionNames
) &&
1172 workerInfo
.taskFunctionNames
.length
> 2
1177 * Chooses a worker node for the next task.
1179 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1181 * @returns The chosen worker node key
1183 private chooseWorkerNode (): number {
1184 if (this.shallCreateDynamicWorker()) {
1185 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1187 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1189 return workerNodeKey
1192 return this.workerChoiceStrategyContext
.execute()
1196 * Conditions for dynamic worker creation.
1198 * @returns Whether to create a dynamic worker or not.
1200 private shallCreateDynamicWorker (): boolean {
1201 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1205 * Sends a message to worker given its worker node key.
1207 * @param workerNodeKey - The worker node key.
1208 * @param message - The message.
1209 * @param transferList - The optional array of transferable objects.
1211 protected abstract sendToWorker (
1212 workerNodeKey
: number,
1213 message
: MessageValue
<Data
>,
1214 transferList
?: TransferListItem
[]
1218 * Creates a new, completely set up worker node.
1220 * @returns New, completely set up worker node key.
1222 protected createAndSetupWorkerNode (): number {
1223 const workerNode
= this.createWorkerNode()
1224 workerNode
.registerWorkerEventHandler(
1226 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1228 workerNode
.registerWorkerEventHandler(
1230 this.opts
.messageHandler
?? EMPTY_FUNCTION
1232 workerNode
.registerWorkerEventHandler(
1234 this.opts
.errorHandler
?? EMPTY_FUNCTION
1236 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1237 workerNode
.info
.ready
= false
1238 this.emitter
?.emit(PoolEvents
.error
, error
)
1243 this.opts
.restartWorkerOnError
=== true
1245 if (workerNode
.info
.dynamic
) {
1246 this.createAndSetupDynamicWorkerNode()
1248 this.createAndSetupWorkerNode()
1251 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1252 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1254 workerNode
?.terminate().catch(error
=> {
1255 this.emitter
?.emit(PoolEvents
.error
, error
)
1258 workerNode
.registerWorkerEventHandler(
1260 this.opts
.exitHandler
?? EMPTY_FUNCTION
1262 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1263 this.removeWorkerNode(workerNode
)
1265 const workerNodeKey
= this.addWorkerNode(workerNode
)
1266 this.afterWorkerNodeSetup(workerNodeKey
)
1267 return workerNodeKey
1271 * Creates a new, completely set up dynamic worker node.
1273 * @returns New, completely set up dynamic worker node key.
1275 protected createAndSetupDynamicWorkerNode (): number {
1276 const workerNodeKey
= this.createAndSetupWorkerNode()
1277 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1278 this.checkMessageWorkerId(message
)
1279 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1282 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1283 // Kill message received from worker
1285 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1286 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1287 ((this.opts
.enableTasksQueue
=== false &&
1288 workerUsage
.tasks
.executing
=== 0) ||
1289 (this.opts
.enableTasksQueue
=== true &&
1290 workerUsage
.tasks
.executing
=== 0 &&
1291 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1293 // Flag the worker node as not ready immediately
1294 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1295 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1296 this.emitter
?.emit(PoolEvents
.error
, error
)
1300 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1301 this.sendToWorker(workerNodeKey
, {
1304 if (this.taskFunctions
.size
> 0) {
1305 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1306 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1307 taskFunctionOperation
: 'add',
1309 taskFunction
: taskFunction
.toString()
1311 this.emitter
?.emit(PoolEvents
.error
, error
)
1315 workerInfo
.dynamic
= true
1317 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1318 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1320 workerInfo
.ready
= true
1322 this.checkAndEmitDynamicWorkerCreationEvents()
1323 return workerNodeKey
1327 * Registers a listener callback on the worker given its worker node key.
1329 * @param workerNodeKey - The worker node key.
1330 * @param listener - The message listener callback.
1332 protected abstract registerWorkerMessageListener
<
1333 Message
extends Data
| Response
1335 workerNodeKey
: number,
1336 listener
: (message
: MessageValue
<Message
>) => void
1340 * Registers once a listener callback on the worker given its worker node key.
1342 * @param workerNodeKey - The worker node key.
1343 * @param listener - The message listener callback.
1345 protected abstract registerOnceWorkerMessageListener
<
1346 Message
extends Data
| Response
1348 workerNodeKey
: number,
1349 listener
: (message
: MessageValue
<Message
>) => void
1353 * Deregisters a listener callback on the worker given its worker node key.
1355 * @param workerNodeKey - The worker node key.
1356 * @param listener - The message listener callback.
1358 protected abstract deregisterWorkerMessageListener
<
1359 Message
extends Data
| Response
1361 workerNodeKey
: number,
1362 listener
: (message
: MessageValue
<Message
>) => void
1366 * Method hooked up after a worker node has been newly created.
1367 * Can be overridden.
1369 * @param workerNodeKey - The newly created worker node key.
1371 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1372 // Listen to worker messages.
1373 this.registerWorkerMessageListener(
1375 this.workerMessageListener
1377 // Send the startup message to worker.
1378 this.sendStartupMessageToWorker(workerNodeKey
)
1379 // Send the statistics message to worker.
1380 this.sendStatisticsMessageToWorker(workerNodeKey
)
1381 if (this.opts
.enableTasksQueue
=== true) {
1382 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1383 this.workerNodes
[workerNodeKey
].on(
1385 this.handleIdleWorkerNodeEvent
1388 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1389 this.workerNodes
[workerNodeKey
].on(
1391 this.handleBackPressureEvent
1398 * Sends the startup message to worker given its worker node key.
1400 * @param workerNodeKey - The worker node key.
1402 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1405 * Sends the statistics message to worker given its worker node key.
1407 * @param workerNodeKey - The worker node key.
1409 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1410 this.sendToWorker(workerNodeKey
, {
1413 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1415 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1421 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1422 if (this.shallExecuteTask(workerNodeKey
)) {
1423 this.executeTask(workerNodeKey
, task
)
1425 this.enqueueTask(workerNodeKey
, task
)
1429 private redistributeQueuedTasks (workerNodeKey
: number): void {
1430 if (workerNodeKey
=== -1) {
1433 if (this.workerNodes
.length
<= 1) {
1436 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1437 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1438 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1439 return workerNode
.info
.ready
&&
1440 workerNode
.usage
.tasks
.queued
<
1441 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1448 destinationWorkerNodeKey
,
1449 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1454 private updateTaskStolenStatisticsWorkerUsage (
1455 workerNodeKey
: number,
1458 const workerNode
= this.workerNodes
[workerNodeKey
]
1459 if (workerNode
?.usage
!= null) {
1460 ++workerNode
.usage
.tasks
.stolen
1463 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1464 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1466 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1469 ++taskFunctionWorkerUsage
.tasks
.stolen
1473 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1474 workerNodeKey
: number
1476 const workerNode
= this.workerNodes
[workerNodeKey
]
1477 if (workerNode
?.usage
!= null) {
1478 ++workerNode
.usage
.tasks
.sequentiallyStolen
1482 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1483 workerNodeKey
: number,
1486 const workerNode
= this.workerNodes
[workerNodeKey
]
1488 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1489 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1491 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1494 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1498 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1499 workerNodeKey
: number
1501 const workerNode
= this.workerNodes
[workerNodeKey
]
1502 if (workerNode
?.usage
!= null) {
1503 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1507 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1508 workerNodeKey
: number,
1511 const workerNode
= this.workerNodes
[workerNodeKey
]
1513 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1514 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1516 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1519 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1523 private readonly handleIdleWorkerNodeEvent
= (
1524 eventDetail
: WorkerNodeEventDetail
,
1525 previousStolenTask
?: Task
<Data
>
1527 if (this.workerNodes
.length
<= 1) {
1530 const { workerNodeKey
} = eventDetail
1531 if (workerNodeKey
== null) {
1533 'WorkerNode event detail workerNodeKey attribute must be defined'
1536 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1538 previousStolenTask
!= null &&
1539 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1540 (workerNodeTasksUsage
.executing
> 0 ||
1541 this.tasksQueueSize(workerNodeKey
) > 0)
1543 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1544 .taskFunctionNames
as string[]) {
1545 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1550 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1553 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1555 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1558 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1560 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1561 ?.tasks
as TaskStatistics
1563 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1564 (previousStolenTask
!= null &&
1565 previousStolenTask
.name
=== stolenTask
.name
&&
1566 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1568 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1570 stolenTask
.name
as string
1573 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1575 stolenTask
.name
as string
1579 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1581 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1584 .catch(EMPTY_FUNCTION
)
1587 private readonly workerNodeStealTask
= (
1588 workerNodeKey
: number
1589 ): Task
<Data
> | undefined => {
1590 const workerNodes
= this.workerNodes
1593 (workerNodeA
, workerNodeB
) =>
1594 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1596 const sourceWorkerNode
= workerNodes
.find(
1597 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1598 sourceWorkerNode
.info
.ready
&&
1599 sourceWorkerNodeKey
!== workerNodeKey
&&
1600 sourceWorkerNode
.usage
.tasks
.queued
> 0
1602 if (sourceWorkerNode
!= null) {
1603 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1604 this.handleTask(workerNodeKey
, task
)
1605 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1606 this.updateTaskStolenStatisticsWorkerUsage(
1614 private readonly handleBackPressureEvent
= (
1615 eventDetail
: WorkerNodeEventDetail
1617 if (this.workerNodes
.length
<= 1) {
1620 const { workerId
} = eventDetail
1621 const sizeOffset
= 1
1622 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1625 const sourceWorkerNode
=
1626 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1627 const workerNodes
= this.workerNodes
1630 (workerNodeA
, workerNodeB
) =>
1631 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1633 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1635 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1636 workerNode
.info
.ready
&&
1637 workerNode
.info
.id
!== workerId
&&
1638 workerNode
.usage
.tasks
.queued
<
1639 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1641 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1642 this.handleTask(workerNodeKey
, task
)
1643 this.updateTaskStolenStatisticsWorkerUsage(
1652 * This method is the message listener registered on each worker.
1654 protected readonly workerMessageListener
= (
1655 message
: MessageValue
<Response
>
1657 this.checkMessageWorkerId(message
)
1658 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1659 if (ready
!= null && taskFunctionNames
!= null) {
1660 // Worker ready response received from worker
1661 this.handleWorkerReadyResponse(message
)
1662 } else if (taskId
!= null) {
1663 // Task execution response received from worker
1664 this.handleTaskExecutionResponse(message
)
1665 } else if (taskFunctionNames
!= null) {
1666 // Task function names message received from worker
1668 this.getWorkerNodeKeyByWorkerId(workerId
)
1669 ).taskFunctionNames
= taskFunctionNames
1673 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1674 const { workerId
, ready
, taskFunctionNames
} = message
1675 if (ready
=== false) {
1676 throw new Error(`Worker ${workerId as number} failed to initialize`)
1678 const workerInfo
= this.getWorkerInfo(
1679 this.getWorkerNodeKeyByWorkerId(workerId
)
1681 workerInfo
.ready
= ready
as boolean
1682 workerInfo
.taskFunctionNames
= taskFunctionNames
1683 if (!this.readyEventEmitted
&& this.ready
) {
1684 this.readyEventEmitted
= true
1685 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1689 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1690 const { workerId
, taskId
, workerError
, data
} = message
1691 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1692 if (promiseResponse
!= null) {
1693 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1694 const workerNode
= this.workerNodes
[workerNodeKey
]
1695 if (workerError
!= null) {
1696 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1697 asyncResource
!= null
1698 ? asyncResource
.runInAsyncScope(
1703 : reject(workerError
.message
)
1705 asyncResource
!= null
1706 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1707 : resolve(data
as Response
)
1709 asyncResource
?.emitDestroy()
1710 this.afterTaskExecutionHook(workerNodeKey
, message
)
1711 this.promiseResponseMap
.delete(taskId
as string)
1712 workerNode
?.emit('taskFinished', taskId
)
1713 if (this.opts
.enableTasksQueue
=== true && !this.destroying
) {
1714 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1716 this.tasksQueueSize(workerNodeKey
) > 0 &&
1717 workerNodeTasksUsage
.executing
<
1718 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1722 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1726 workerNodeTasksUsage
.executing
=== 0 &&
1727 this.tasksQueueSize(workerNodeKey
) === 0 &&
1728 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1730 workerNode
.emit('idleWorkerNode', {
1731 workerId
: workerId
as number,
1739 private checkAndEmitTaskExecutionEvents (): void {
1741 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1745 private checkAndEmitTaskQueuingEvents (): void {
1746 if (this.hasBackPressure()) {
1747 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1751 private checkAndEmitDynamicWorkerCreationEvents (): void {
1752 if (this.type === PoolTypes
.dynamic
) {
1754 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1760 * Gets the worker information given its worker node key.
1762 * @param workerNodeKey - The worker node key.
1763 * @returns The worker information.
1765 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1766 return this.workerNodes
[workerNodeKey
]?.info
1770 * Creates a worker node.
1772 * @returns The created worker node.
1774 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1775 const workerNode
= new WorkerNode
<Worker
, Data
>(
1780 workerOptions
: this.opts
.workerOptions
,
1781 tasksQueueBackPressureSize
:
1782 this.opts
.tasksQueueOptions
?.size
??
1783 getDefaultTasksQueueOptions(this.maxSize
).size
1786 // Flag the worker node as ready at pool startup.
1787 if (this.starting
) {
1788 workerNode
.info
.ready
= true
1794 * Adds the given worker node in the pool worker nodes.
1796 * @param workerNode - The worker node.
1797 * @returns The added worker node key.
1798 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1800 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1801 this.workerNodes
.push(workerNode
)
1802 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1803 if (workerNodeKey
=== -1) {
1804 throw new Error('Worker added not found in worker nodes')
1806 return workerNodeKey
1810 * Removes the worker node from the pool worker nodes.
1812 * @param workerNode - The worker node.
1814 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1815 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1816 if (workerNodeKey
!== -1) {
1817 this.workerNodes
.splice(workerNodeKey
, 1)
1818 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1822 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1823 this.getWorkerInfo(workerNodeKey
).ready
= false
1827 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1829 this.opts
.enableTasksQueue
=== true &&
1830 this.workerNodes
[workerNodeKey
].hasBackPressure()
1834 private hasBackPressure (): boolean {
1836 this.opts
.enableTasksQueue
=== true &&
1837 this.workerNodes
.findIndex(
1838 workerNode
=> !workerNode
.hasBackPressure()
1844 * Executes the given task on the worker given its worker node key.
1846 * @param workerNodeKey - The worker node key.
1847 * @param task - The task to execute.
1849 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1850 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1851 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1852 this.checkAndEmitTaskExecutionEvents()
1855 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1856 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1857 this.checkAndEmitTaskQueuingEvents()
1858 return tasksQueueSize
1861 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1862 return this.workerNodes
[workerNodeKey
].dequeueTask()
1865 private tasksQueueSize (workerNodeKey
: number): number {
1866 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1869 protected flushTasksQueue (workerNodeKey
: number): number {
1870 let flushedTasks
= 0
1871 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1874 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1878 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1882 private flushTasksQueues (): void {
1883 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1884 this.flushTasksQueue(workerNodeKey
)