1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types'
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
25 import { KillBehaviors
} from
'../worker/worker-options'
26 import type { TaskFunction
} from
'../worker/task-functions'
34 type TasksQueueOptions
41 WorkerNodeEventDetail
,
47 WorkerChoiceStrategies
,
48 type WorkerChoiceStrategy
,
49 type WorkerChoiceStrategyOptions
50 } from
'./selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
52 import { version
} from
'./version'
53 import { WorkerNode
} from
'./worker-node'
56 checkValidTasksQueueOptions
,
57 checkValidWorkerChoiceStrategy
,
59 updateRunTimeWorkerUsage
,
60 updateTaskStatisticsWorkerUsage
,
61 updateWaitTimeWorkerUsage
,
66 * Base class that implements some shared logic for all poolifier pools.
68 * @typeParam Worker - Type of worker which manages this pool.
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
72 export abstract class AbstractPool
<
73 Worker
extends IWorker
,
76 > implements IPool
<Worker
, Data
, Response
> {
78 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
81 public emitter
?: EventEmitterAsyncResource
84 * Dynamic pool maximum size property placeholder.
86 protected readonly max
?: number
89 * The task execution response promise map:
90 * - `key`: The message id of each submitted task.
91 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
93 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
95 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
96 new Map
<string, PromiseResponseWrapper
<Response
>>()
99 * Worker choice strategy context referencing a worker choice algorithm implementation.
101 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
108 * The task functions added at runtime map:
109 * - `key`: The task function name.
110 * - `value`: The task function itself.
112 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
115 * Whether the pool is started or not.
117 private started
: boolean
119 * Whether the pool is starting or not.
121 private starting
: boolean
123 * Whether the pool is destroying or not.
125 private destroying
: boolean
127 * Whether the pool ready event has been emitted or not.
129 private readyEventEmitted
: boolean
131 * The start timestamp of the pool.
133 private readonly startTimestamp
136 * Constructs a new poolifier pool.
138 * @param numberOfWorkers - Number of workers that this pool should manage.
139 * @param filePath - Path to the worker file.
140 * @param opts - Options for the pool.
143 protected readonly numberOfWorkers
: number,
144 protected readonly filePath
: string,
145 protected readonly opts
: PoolOptions
<Worker
>
147 if (!this.isMain()) {
149 'Cannot start a pool from a worker with the same type as the pool'
152 checkFilePath(this.filePath
)
153 this.checkNumberOfWorkers(this.numberOfWorkers
)
154 this.checkPoolOptions(this.opts
)
156 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
157 this.executeTask
= this.executeTask
.bind(this)
158 this.enqueueTask
= this.enqueueTask
.bind(this)
160 if (this.opts
.enableEvents
=== true) {
161 this.initializeEventEmitter()
163 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
169 this.opts
.workerChoiceStrategy
,
170 this.opts
.workerChoiceStrategyOptions
175 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
178 this.starting
= false
179 this.destroying
= false
180 this.readyEventEmitted
= false
181 if (this.opts
.startWorkers
=== true) {
185 this.startTimestamp
= performance
.now()
188 private checkNumberOfWorkers (numberOfWorkers
: number): void {
189 if (numberOfWorkers
== null) {
191 'Cannot instantiate a pool without specifying the number of workers'
193 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
195 'Cannot instantiate a pool with a non safe integer number of workers'
197 } else if (numberOfWorkers
< 0) {
198 throw new RangeError(
199 'Cannot instantiate a pool with a negative number of workers'
201 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
202 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
206 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
207 if (isPlainObject(opts
)) {
208 this.opts
.startWorkers
= opts
.startWorkers
?? true
209 checkValidWorkerChoiceStrategy(
210 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
212 this.opts
.workerChoiceStrategy
=
213 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategyOptions(
215 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
217 this.opts
.workerChoiceStrategyOptions
= {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
219 ...opts
.workerChoiceStrategyOptions
221 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
222 this.opts
.enableEvents
= opts
.enableEvents
?? true
223 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
224 if (this.opts
.enableTasksQueue
) {
225 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
226 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
227 opts
.tasksQueueOptions
as TasksQueueOptions
231 throw new TypeError('Invalid pool options: must be a plain object')
235 private checkValidWorkerChoiceStrategyOptions (
236 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
239 workerChoiceStrategyOptions
!= null &&
240 !isPlainObject(workerChoiceStrategyOptions
)
243 'Invalid worker choice strategy options: must be a plain object'
247 workerChoiceStrategyOptions
?.retries
!= null &&
248 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
251 'Invalid worker choice strategy options: retries must be an integer'
255 workerChoiceStrategyOptions
?.retries
!= null &&
256 workerChoiceStrategyOptions
.retries
< 0
258 throw new RangeError(
259 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
263 workerChoiceStrategyOptions
?.weights
!= null &&
264 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
267 'Invalid worker choice strategy options: must have a weight for each worker node'
271 workerChoiceStrategyOptions
?.measurement
!= null &&
272 !Object.values(Measurements
).includes(
273 workerChoiceStrategyOptions
.measurement
277 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
282 private initializeEventEmitter (): void {
283 this.emitter
= new EventEmitterAsyncResource({
284 name
: `poolifier:${this.type}-${this.worker}-pool`
289 public get
info (): PoolInfo
{
294 started
: this.started
,
296 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
297 minSize
: this.minSize
,
298 maxSize
: this.maxSize
,
299 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
300 .runTime
.aggregate
&&
301 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
302 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
303 workerNodes
: this.workerNodes
.length
,
304 idleWorkerNodes
: this.workerNodes
.reduce(
305 (accumulator
, workerNode
) =>
306 workerNode
.usage
.tasks
.executing
=== 0
311 busyWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, _workerNode
, workerNodeKey
) =>
313 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
316 executedTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ workerNode
.usage
.tasks
.executed
,
321 executingTasks
: this.workerNodes
.reduce(
322 (accumulator
, workerNode
) =>
323 accumulator
+ workerNode
.usage
.tasks
.executing
,
326 ...(this.opts
.enableTasksQueue
=== true && {
327 queuedTasks
: this.workerNodes
.reduce(
328 (accumulator
, workerNode
) =>
329 accumulator
+ workerNode
.usage
.tasks
.queued
,
333 ...(this.opts
.enableTasksQueue
=== true && {
334 maxQueuedTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
340 ...(this.opts
.enableTasksQueue
=== true && {
341 backPressure
: this.hasBackPressure()
343 ...(this.opts
.enableTasksQueue
=== true && {
344 stolenTasks
: this.workerNodes
.reduce(
345 (accumulator
, workerNode
) =>
346 accumulator
+ workerNode
.usage
.tasks
.stolen
,
350 failedTasks
: this.workerNodes
.reduce(
351 (accumulator
, workerNode
) =>
352 accumulator
+ workerNode
.usage
.tasks
.failed
,
355 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
356 .runTime
.aggregate
&& {
360 ...this.workerNodes
.map(
361 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
367 ...this.workerNodes
.map(
368 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
372 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
373 .runTime
.average
&& {
376 this.workerNodes
.reduce
<number[]>(
377 (accumulator
, workerNode
) =>
378 accumulator
.concat(workerNode
.usage
.runTime
.history
),
384 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
388 this.workerNodes
.reduce
<number[]>(
389 (accumulator
, workerNode
) =>
390 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
399 .waitTime
.aggregate
&& {
403 ...this.workerNodes
.map(
404 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
410 ...this.workerNodes
.map(
411 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
415 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
416 .waitTime
.average
&& {
419 this.workerNodes
.reduce
<number[]>(
420 (accumulator
, workerNode
) =>
421 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
427 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
428 .waitTime
.median
&& {
431 this.workerNodes
.reduce
<number[]>(
432 (accumulator
, workerNode
) =>
433 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
445 * The pool readiness boolean status.
447 private get
ready (): boolean {
449 this.workerNodes
.reduce(
450 (accumulator
, workerNode
) =>
451 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
460 * The approximate pool utilization.
462 * @returns The pool utilization.
464 private get
utilization (): number {
465 const poolTimeCapacity
=
466 (performance
.now() - this.startTimestamp
) * this.maxSize
467 const totalTasksRunTime
= this.workerNodes
.reduce(
468 (accumulator
, workerNode
) =>
469 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
472 const totalTasksWaitTime
= this.workerNodes
.reduce(
473 (accumulator
, workerNode
) =>
474 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
477 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
483 * If it is `'dynamic'`, it provides the `max` property.
485 protected abstract get
type (): PoolType
490 protected abstract get
worker (): WorkerType
493 * The pool minimum size.
495 protected get
minSize (): number {
496 return this.numberOfWorkers
500 * The pool maximum size.
502 protected get
maxSize (): number {
503 return this.max
?? this.numberOfWorkers
507 * Checks if the worker id sent in the received message from a worker is valid.
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
512 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
513 if (message
.workerId
== null) {
514 throw new Error('Worker message received without worker id')
515 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
517 `Worker message received from unknown worker '${message.workerId}'`
523 * Gets the given worker its worker node key.
525 * @param worker - The worker.
526 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
528 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
529 return this.workerNodes
.findIndex(
530 workerNode
=> workerNode
.worker
=== worker
535 * Gets the worker node key given its worker id.
537 * @param workerId - The worker id.
538 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
540 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
541 return this.workerNodes
.findIndex(
542 workerNode
=> workerNode
.info
.id
=== workerId
547 public setWorkerChoiceStrategy (
548 workerChoiceStrategy
: WorkerChoiceStrategy
,
549 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
551 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
552 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
553 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
554 this.opts
.workerChoiceStrategy
556 if (workerChoiceStrategyOptions
!= null) {
557 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
559 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
560 workerNode
.resetUsage()
561 this.sendStatisticsMessageToWorker(workerNodeKey
)
566 public setWorkerChoiceStrategyOptions (
567 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
569 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
570 this.opts
.workerChoiceStrategyOptions
= {
571 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
572 ...workerChoiceStrategyOptions
574 this.workerChoiceStrategyContext
.setOptions(
575 this.opts
.workerChoiceStrategyOptions
580 public enableTasksQueue (
582 tasksQueueOptions
?: TasksQueueOptions
584 if (this.opts
.enableTasksQueue
=== true && !enable
) {
585 this.unsetTaskStealing()
586 this.unsetTasksStealingOnBackPressure()
587 this.flushTasksQueues()
589 this.opts
.enableTasksQueue
= enable
590 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
594 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
595 if (this.opts
.enableTasksQueue
=== true) {
596 checkValidTasksQueueOptions(tasksQueueOptions
)
597 this.opts
.tasksQueueOptions
=
598 this.buildTasksQueueOptions(tasksQueueOptions
)
599 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
600 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
601 this.unsetTaskStealing()
602 this.setTaskStealing()
604 this.unsetTaskStealing()
606 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
607 this.unsetTasksStealingOnBackPressure()
608 this.setTasksStealingOnBackPressure()
610 this.unsetTasksStealingOnBackPressure()
612 } else if (this.opts
.tasksQueueOptions
!= null) {
613 delete this.opts
.tasksQueueOptions
617 private buildTasksQueueOptions (
618 tasksQueueOptions
: TasksQueueOptions
619 ): TasksQueueOptions
{
622 size
: Math.pow(this.maxSize
, 2),
625 tasksStealingOnBackPressure
: true
631 private setTasksQueueSize (size
: number): void {
632 for (const workerNode
of this.workerNodes
) {
633 workerNode
.tasksQueueBackPressureSize
= size
637 private setTaskStealing (): void {
638 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
639 this.workerNodes
[workerNodeKey
].on(
641 this.handleIdleWorkerNodeEvent
646 private unsetTaskStealing (): void {
647 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
648 this.workerNodes
[workerNodeKey
].off(
650 this.handleIdleWorkerNodeEvent
655 private setTasksStealingOnBackPressure (): void {
656 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
657 this.workerNodes
[workerNodeKey
].on(
659 this.handleBackPressureEvent
664 private unsetTasksStealingOnBackPressure (): void {
665 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
666 this.workerNodes
[workerNodeKey
].off(
668 this.handleBackPressureEvent
674 * Whether the pool is full or not.
676 * The pool filling boolean status.
678 protected get
full (): boolean {
679 return this.workerNodes
.length
>= this.maxSize
683 * Whether the pool is busy or not.
685 * The pool busyness boolean status.
687 protected abstract get
busy (): boolean
690 * Whether worker nodes are executing concurrently their tasks quota or not.
692 * @returns Worker nodes busyness boolean status.
694 protected internalBusy (): boolean {
695 if (this.opts
.enableTasksQueue
=== true) {
697 this.workerNodes
.findIndex(
699 workerNode
.info
.ready
&&
700 workerNode
.usage
.tasks
.executing
<
701 (this.opts
.tasksQueueOptions
?.concurrency
as number)
706 this.workerNodes
.findIndex(
708 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
713 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
714 if (this.opts
.enableTasksQueue
=== true) {
716 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
717 (this.opts
.tasksQueueOptions
?.concurrency
as number)
720 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
723 private async sendTaskFunctionOperationToWorker (
724 workerNodeKey
: number,
725 message
: MessageValue
<Data
>
726 ): Promise
<boolean> {
727 return await new Promise
<boolean>((resolve
, reject
) => {
728 const taskFunctionOperationListener
= (
729 message
: MessageValue
<Response
>
731 this.checkMessageWorkerId(message
)
732 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
734 message
.taskFunctionOperationStatus
!= null &&
735 message
.workerId
=== workerId
737 if (message
.taskFunctionOperationStatus
) {
739 } else if (!message
.taskFunctionOperationStatus
) {
742 `Task function operation '${
743 message.taskFunctionOperation as string
744 }' failed on worker ${message.workerId} with error: '${
745 message.workerError?.message as string
750 this.deregisterWorkerMessageListener(
751 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
752 taskFunctionOperationListener
756 this.registerWorkerMessageListener(
758 taskFunctionOperationListener
760 this.sendToWorker(workerNodeKey
, message
)
764 private async sendTaskFunctionOperationToWorkers (
765 message
: MessageValue
<Data
>
766 ): Promise
<boolean> {
767 return await new Promise
<boolean>((resolve
, reject
) => {
768 const responsesReceived
= new Array<MessageValue
<Response
>>()
769 const taskFunctionOperationsListener
= (
770 message
: MessageValue
<Response
>
772 this.checkMessageWorkerId(message
)
773 if (message
.taskFunctionOperationStatus
!= null) {
774 responsesReceived
.push(message
)
775 if (responsesReceived
.length
=== this.workerNodes
.length
) {
777 responsesReceived
.every(
778 message
=> message
.taskFunctionOperationStatus
=== true
783 responsesReceived
.some(
784 message
=> message
.taskFunctionOperationStatus
=== false
787 const errorResponse
= responsesReceived
.find(
788 response
=> response
.taskFunctionOperationStatus
=== false
792 `Task function operation '${
793 message.taskFunctionOperation as string
794 }' failed on worker ${
795 errorResponse?.workerId as number
797 errorResponse?.workerError?.message as string
802 this.deregisterWorkerMessageListener(
803 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
804 taskFunctionOperationsListener
809 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
810 this.registerWorkerMessageListener(
812 taskFunctionOperationsListener
814 this.sendToWorker(workerNodeKey
, message
)
820 public hasTaskFunction (name
: string): boolean {
821 for (const workerNode
of this.workerNodes
) {
823 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
824 workerNode
.info
.taskFunctionNames
.includes(name
)
833 public async addTaskFunction (
835 fn
: TaskFunction
<Data
, Response
>
836 ): Promise
<boolean> {
837 if (typeof name
!== 'string') {
838 throw new TypeError('name argument must be a string')
840 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
841 throw new TypeError('name argument must not be an empty string')
843 if (typeof fn
!== 'function') {
844 throw new TypeError('fn argument must be a function')
846 const opResult
= await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation
: 'add',
848 taskFunctionName
: name
,
849 taskFunction
: fn
.toString()
851 this.taskFunctions
.set(name
, fn
)
856 public async removeTaskFunction (name
: string): Promise
<boolean> {
857 if (!this.taskFunctions
.has(name
)) {
859 'Cannot remove a task function not handled on the pool side'
862 const opResult
= await this.sendTaskFunctionOperationToWorkers({
863 taskFunctionOperation
: 'remove',
864 taskFunctionName
: name
866 this.deleteTaskFunctionWorkerUsages(name
)
867 this.taskFunctions
.delete(name
)
872 public listTaskFunctionNames (): string[] {
873 for (const workerNode
of this.workerNodes
) {
875 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
876 workerNode
.info
.taskFunctionNames
.length
> 0
878 return workerNode
.info
.taskFunctionNames
885 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
886 return await this.sendTaskFunctionOperationToWorkers({
887 taskFunctionOperation
: 'default',
888 taskFunctionName
: name
892 private deleteTaskFunctionWorkerUsages (name
: string): void {
893 for (const workerNode
of this.workerNodes
) {
894 workerNode
.deleteTaskFunctionWorkerUsage(name
)
898 private shallExecuteTask (workerNodeKey
: number): boolean {
900 this.tasksQueueSize(workerNodeKey
) === 0 &&
901 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
902 (this.opts
.tasksQueueOptions
?.concurrency
as number)
907 public async execute (
910 transferList
?: TransferListItem
[]
911 ): Promise
<Response
> {
912 return await new Promise
<Response
>((resolve
, reject
) => {
914 reject(new Error('Cannot execute a task on not started pool'))
917 if (this.destroying
) {
918 reject(new Error('Cannot execute a task on destroying pool'))
921 if (name
!= null && typeof name
!== 'string') {
922 reject(new TypeError('name argument must be a string'))
927 typeof name
=== 'string' &&
928 name
.trim().length
=== 0
930 reject(new TypeError('name argument must not be an empty string'))
933 if (transferList
!= null && !Array.isArray(transferList
)) {
934 reject(new TypeError('transferList argument must be an array'))
937 const timestamp
= performance
.now()
938 const workerNodeKey
= this.chooseWorkerNode()
939 const task
: Task
<Data
> = {
940 name
: name
?? DEFAULT_TASK_NAME
,
941 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
942 data
: data
?? ({} as Data
),
947 this.promiseResponseMap
.set(task
.taskId
as string, {
951 ...(this.emitter
!= null && {
952 asyncResource
: new AsyncResource('poolifier:task', {
953 triggerAsyncId
: this.emitter
.asyncId
,
954 requireManualDestroy
: true
959 this.opts
.enableTasksQueue
=== false ||
960 (this.opts
.enableTasksQueue
=== true &&
961 this.shallExecuteTask(workerNodeKey
))
963 this.executeTask(workerNodeKey
, task
)
965 this.enqueueTask(workerNodeKey
, task
)
971 public start (): void {
973 throw new Error('Cannot start an already started pool')
976 throw new Error('Cannot start an already starting pool')
978 if (this.destroying
) {
979 throw new Error('Cannot start a destroying pool')
983 this.workerNodes
.reduce(
984 (accumulator
, workerNode
) =>
985 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
987 ) < this.numberOfWorkers
989 this.createAndSetupWorkerNode()
991 this.starting
= false
996 public async destroy (): Promise
<void> {
998 throw new Error('Cannot destroy an already destroyed pool')
1000 if (this.starting
) {
1001 throw new Error('Cannot destroy an starting pool')
1003 if (this.destroying
) {
1004 throw new Error('Cannot destroy an already destroying pool')
1006 this.destroying
= true
1008 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1009 await this.destroyWorkerNode(workerNodeKey
)
1012 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1013 this.emitter
?.emitDestroy()
1014 this.emitter
?.removeAllListeners()
1015 this.readyEventEmitted
= false
1016 this.destroying
= false
1017 this.started
= false
1020 protected async sendKillMessageToWorker (
1021 workerNodeKey
: number
1023 await new Promise
<void>((resolve
, reject
) => {
1024 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1025 this.checkMessageWorkerId(message
)
1026 if (message
.kill
=== 'success') {
1028 } else if (message
.kill
=== 'failure') {
1031 `Kill message handling failed on worker ${
1032 message.workerId as number
1038 // FIXME: should be registered only once
1039 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1040 this.sendToWorker(workerNodeKey
, { kill
: true })
1045 * Terminates the worker node given its worker node key.
1047 * @param workerNodeKey - The worker node key.
1049 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1050 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1051 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1052 const workerNode
= this.workerNodes
[workerNodeKey
]
1053 await waitWorkerNodeEvents(workerNode
, 'taskFinished', flushedTasks
)
1054 await this.sendKillMessageToWorker(workerNodeKey
)
1055 await workerNode
.terminate()
1059 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1060 * Can be overridden.
1064 protected setupHook (): void {
1065 /* Intentionally empty */
1069 * Should return whether the worker is the main worker or not.
1071 protected abstract isMain (): boolean
1074 * Hook executed before the worker task execution.
1075 * Can be overridden.
1077 * @param workerNodeKey - The worker node key.
1078 * @param task - The task to execute.
1080 protected beforeTaskExecutionHook (
1081 workerNodeKey
: number,
1084 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1085 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1086 ++workerUsage
.tasks
.executing
1087 updateWaitTimeWorkerUsage(
1088 this.workerChoiceStrategyContext
,
1094 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1095 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1099 const taskFunctionWorkerUsage
= this.workerNodes
[
1101 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1102 ++taskFunctionWorkerUsage
.tasks
.executing
1103 updateWaitTimeWorkerUsage(
1104 this.workerChoiceStrategyContext
,
1105 taskFunctionWorkerUsage
,
1112 * Hook executed after the worker task execution.
1113 * Can be overridden.
1115 * @param workerNodeKey - The worker node key.
1116 * @param message - The received message.
1118 protected afterTaskExecutionHook (
1119 workerNodeKey
: number,
1120 message
: MessageValue
<Response
>
1122 let needWorkerChoiceStrategyUpdate
= false
1123 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1124 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1125 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1126 updateRunTimeWorkerUsage(
1127 this.workerChoiceStrategyContext
,
1131 updateEluWorkerUsage(
1132 this.workerChoiceStrategyContext
,
1136 needWorkerChoiceStrategyUpdate
= true
1139 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1140 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1141 message
.taskPerformance
?.name
as string
1144 const taskFunctionWorkerUsage
= this.workerNodes
[
1146 ].getTaskFunctionWorkerUsage(
1147 message
.taskPerformance
?.name
as string
1149 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1150 updateRunTimeWorkerUsage(
1151 this.workerChoiceStrategyContext
,
1152 taskFunctionWorkerUsage
,
1155 updateEluWorkerUsage(
1156 this.workerChoiceStrategyContext
,
1157 taskFunctionWorkerUsage
,
1160 needWorkerChoiceStrategyUpdate
= true
1162 if (needWorkerChoiceStrategyUpdate
) {
1163 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1168 * Whether the worker node shall update its task function worker usage or not.
1170 * @param workerNodeKey - The worker node key.
1171 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1173 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1174 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1176 workerInfo
!= null &&
1177 Array.isArray(workerInfo
.taskFunctionNames
) &&
1178 workerInfo
.taskFunctionNames
.length
> 2
1183 * Chooses a worker node for the next task.
1185 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1187 * @returns The chosen worker node key
1189 private chooseWorkerNode (): number {
1190 if (this.shallCreateDynamicWorker()) {
1191 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1193 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1195 return workerNodeKey
1198 return this.workerChoiceStrategyContext
.execute()
1202 * Conditions for dynamic worker creation.
1204 * @returns Whether to create a dynamic worker or not.
1206 private shallCreateDynamicWorker (): boolean {
1207 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1211 * Sends a message to worker given its worker node key.
1213 * @param workerNodeKey - The worker node key.
1214 * @param message - The message.
1215 * @param transferList - The optional array of transferable objects.
1217 protected abstract sendToWorker (
1218 workerNodeKey
: number,
1219 message
: MessageValue
<Data
>,
1220 transferList
?: TransferListItem
[]
1224 * Creates a new, completely set up worker node.
1226 * @returns New, completely set up worker node key.
1228 protected createAndSetupWorkerNode (): number {
1229 const workerNode
= this.createWorkerNode()
1230 workerNode
.registerWorkerEventHandler(
1232 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1234 workerNode
.registerWorkerEventHandler(
1236 this.opts
.messageHandler
?? EMPTY_FUNCTION
1238 workerNode
.registerWorkerEventHandler(
1240 this.opts
.errorHandler
?? EMPTY_FUNCTION
1242 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1243 workerNode
.info
.ready
= false
1244 this.emitter
?.emit(PoolEvents
.error
, error
)
1249 this.opts
.restartWorkerOnError
=== true
1251 if (workerNode
.info
.dynamic
) {
1252 this.createAndSetupDynamicWorkerNode()
1254 this.createAndSetupWorkerNode()
1257 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1258 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1260 workerNode
.terminate().catch(error
=> {
1261 this.emitter
?.emit(PoolEvents
.error
, error
)
1264 workerNode
.registerWorkerEventHandler(
1266 this.opts
.exitHandler
?? EMPTY_FUNCTION
1268 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1269 this.removeWorkerNode(workerNode
)
1271 const workerNodeKey
= this.addWorkerNode(workerNode
)
1272 this.afterWorkerNodeSetup(workerNodeKey
)
1273 return workerNodeKey
1277 * Creates a new, completely set up dynamic worker node.
1279 * @returns New, completely set up dynamic worker node key.
1281 protected createAndSetupDynamicWorkerNode (): number {
1282 const workerNodeKey
= this.createAndSetupWorkerNode()
1283 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1284 this.checkMessageWorkerId(message
)
1285 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1288 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1289 // Kill message received from worker
1291 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1292 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1293 ((this.opts
.enableTasksQueue
=== false &&
1294 workerUsage
.tasks
.executing
=== 0) ||
1295 (this.opts
.enableTasksQueue
=== true &&
1296 workerUsage
.tasks
.executing
=== 0 &&
1297 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1299 // Flag the worker node as not ready immediately
1300 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1301 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1302 this.emitter
?.emit(PoolEvents
.error
, error
)
1306 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1307 this.sendToWorker(workerNodeKey
, {
1310 if (this.taskFunctions
.size
> 0) {
1311 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1312 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1313 taskFunctionOperation
: 'add',
1315 taskFunction
: taskFunction
.toString()
1317 this.emitter
?.emit(PoolEvents
.error
, error
)
1321 workerInfo
.dynamic
= true
1323 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1324 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1326 workerInfo
.ready
= true
1328 this.checkAndEmitDynamicWorkerCreationEvents()
1329 return workerNodeKey
1333 * Registers a listener callback on the worker given its worker node key.
1335 * @param workerNodeKey - The worker node key.
1336 * @param listener - The message listener callback.
1338 protected abstract registerWorkerMessageListener
<
1339 Message
extends Data
| Response
1341 workerNodeKey
: number,
1342 listener
: (message
: MessageValue
<Message
>) => void
1346 * Registers once a listener callback on the worker given its worker node key.
1348 * @param workerNodeKey - The worker node key.
1349 * @param listener - The message listener callback.
1351 protected abstract registerOnceWorkerMessageListener
<
1352 Message
extends Data
| Response
1354 workerNodeKey
: number,
1355 listener
: (message
: MessageValue
<Message
>) => void
1359 * Deregisters a listener callback on the worker given its worker node key.
1361 * @param workerNodeKey - The worker node key.
1362 * @param listener - The message listener callback.
1364 protected abstract deregisterWorkerMessageListener
<
1365 Message
extends Data
| Response
1367 workerNodeKey
: number,
1368 listener
: (message
: MessageValue
<Message
>) => void
1372 * Method hooked up after a worker node has been newly created.
1373 * Can be overridden.
1375 * @param workerNodeKey - The newly created worker node key.
1377 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1378 // Listen to worker messages.
1379 this.registerWorkerMessageListener(
1381 this.workerMessageListener
1383 // Send the startup message to worker.
1384 this.sendStartupMessageToWorker(workerNodeKey
)
1385 // Send the statistics message to worker.
1386 this.sendStatisticsMessageToWorker(workerNodeKey
)
1387 if (this.opts
.enableTasksQueue
=== true) {
1388 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1389 this.workerNodes
[workerNodeKey
].on(
1391 this.handleIdleWorkerNodeEvent
1394 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1395 this.workerNodes
[workerNodeKey
].on(
1397 this.handleBackPressureEvent
1404 * Sends the startup message to worker given its worker node key.
1406 * @param workerNodeKey - The worker node key.
1408 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1411 * Sends the statistics message to worker given its worker node key.
1413 * @param workerNodeKey - The worker node key.
1415 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1416 this.sendToWorker(workerNodeKey
, {
1419 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1421 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1427 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1428 if (this.shallExecuteTask(workerNodeKey
)) {
1429 this.executeTask(workerNodeKey
, task
)
1431 this.enqueueTask(workerNodeKey
, task
)
1435 private redistributeQueuedTasks (workerNodeKey
: number): void {
1436 if (this.workerNodes
.length
<= 1) {
1439 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1440 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1441 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1442 return workerNode
.info
.ready
&&
1443 workerNode
.usage
.tasks
.queued
<
1444 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1451 destinationWorkerNodeKey
,
1452 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1457 private updateTaskStolenStatisticsWorkerUsage (
1458 workerNodeKey
: number,
1461 const workerNode
= this.workerNodes
[workerNodeKey
]
1462 if (workerNode
?.usage
!= null) {
1463 ++workerNode
.usage
.tasks
.stolen
1466 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1467 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1469 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1472 ++taskFunctionWorkerUsage
.tasks
.stolen
1476 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1477 workerNodeKey
: number
1479 const workerNode
= this.workerNodes
[workerNodeKey
]
1480 if (workerNode
?.usage
!= null) {
1481 ++workerNode
.usage
.tasks
.sequentiallyStolen
1485 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1486 workerNodeKey
: number,
1489 const workerNode
= this.workerNodes
[workerNodeKey
]
1491 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1492 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1494 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1497 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1501 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1502 workerNodeKey
: number
1504 const workerNode
= this.workerNodes
[workerNodeKey
]
1505 if (workerNode
?.usage
!= null) {
1506 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1510 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1511 workerNodeKey
: number,
1514 const workerNode
= this.workerNodes
[workerNodeKey
]
1516 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1517 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1519 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1522 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1526 private readonly handleIdleWorkerNodeEvent
= (
1527 eventDetail
: WorkerNodeEventDetail
,
1528 previousStolenTask
?: Task
<Data
>
1530 if (this.workerNodes
.length
<= 1) {
1533 const { workerNodeKey
} = eventDetail
1534 if (workerNodeKey
== null) {
1536 'WorkerNode event detail workerNodeKey attribute must be defined'
1539 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1541 previousStolenTask
!= null &&
1542 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1543 (workerNodeTasksUsage
.executing
> 0 ||
1544 this.tasksQueueSize(workerNodeKey
) > 0)
1546 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1547 .taskFunctionNames
as string[]) {
1548 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1553 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1556 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1558 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1561 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1563 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1564 ?.tasks
as TaskStatistics
1566 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1567 (previousStolenTask
!= null &&
1568 previousStolenTask
.name
=== stolenTask
.name
&&
1569 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1571 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1573 stolenTask
.name
as string
1576 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1578 stolenTask
.name
as string
1582 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1584 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1587 .catch(EMPTY_FUNCTION
)
1590 private readonly workerNodeStealTask
= (
1591 workerNodeKey
: number
1592 ): Task
<Data
> | undefined => {
1593 const workerNodes
= this.workerNodes
1596 (workerNodeA
, workerNodeB
) =>
1597 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1599 const sourceWorkerNode
= workerNodes
.find(
1600 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1601 sourceWorkerNode
.info
.ready
&&
1602 sourceWorkerNodeKey
!== workerNodeKey
&&
1603 sourceWorkerNode
.usage
.tasks
.queued
> 0
1605 if (sourceWorkerNode
!= null) {
1606 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1607 this.handleTask(workerNodeKey
, task
)
1608 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1609 this.updateTaskStolenStatisticsWorkerUsage(
1617 private readonly handleBackPressureEvent
= (
1618 eventDetail
: WorkerNodeEventDetail
1620 if (this.workerNodes
.length
<= 1) {
1623 const { workerId
} = eventDetail
1624 const sizeOffset
= 1
1625 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1628 const sourceWorkerNode
=
1629 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1630 const workerNodes
= this.workerNodes
1633 (workerNodeA
, workerNodeB
) =>
1634 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1636 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1638 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1639 workerNode
.info
.ready
&&
1640 workerNode
.info
.id
!== workerId
&&
1641 workerNode
.usage
.tasks
.queued
<
1642 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1644 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1645 this.handleTask(workerNodeKey
, task
)
1646 this.updateTaskStolenStatisticsWorkerUsage(
1655 * This method is the message listener registered on each worker.
1657 protected readonly workerMessageListener
= (
1658 message
: MessageValue
<Response
>
1660 this.checkMessageWorkerId(message
)
1661 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1662 if (ready
!= null && taskFunctionNames
!= null) {
1663 // Worker ready response received from worker
1664 this.handleWorkerReadyResponse(message
)
1665 } else if (taskId
!= null) {
1666 // Task execution response received from worker
1667 this.handleTaskExecutionResponse(message
)
1668 } else if (taskFunctionNames
!= null) {
1669 // Task function names message received from worker
1671 this.getWorkerNodeKeyByWorkerId(workerId
)
1672 ).taskFunctionNames
= taskFunctionNames
1676 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1677 const { workerId
, ready
, taskFunctionNames
} = message
1678 if (ready
=== false) {
1679 throw new Error(`Worker ${workerId as number} failed to initialize`)
1681 const workerInfo
= this.getWorkerInfo(
1682 this.getWorkerNodeKeyByWorkerId(workerId
)
1684 workerInfo
.ready
= ready
as boolean
1685 workerInfo
.taskFunctionNames
= taskFunctionNames
1686 if (!this.readyEventEmitted
&& this.ready
) {
1687 this.readyEventEmitted
= true
1688 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1692 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1693 const { workerId
, taskId
, workerError
, data
} = message
1694 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1695 if (promiseResponse
!= null) {
1696 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1697 const workerNode
= this.workerNodes
[workerNodeKey
]
1698 if (workerError
!= null) {
1699 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1700 asyncResource
!= null
1701 ? asyncResource
.runInAsyncScope(
1706 : reject(workerError
.message
)
1708 asyncResource
!= null
1709 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1710 : resolve(data
as Response
)
1712 asyncResource
?.emitDestroy()
1713 this.afterTaskExecutionHook(workerNodeKey
, message
)
1714 this.promiseResponseMap
.delete(taskId
as string)
1715 workerNode
?.emit('taskFinished', taskId
)
1716 if (this.opts
.enableTasksQueue
=== true) {
1717 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1719 this.tasksQueueSize(workerNodeKey
) > 0 &&
1720 workerNodeTasksUsage
.executing
<
1721 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1725 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1729 workerNodeTasksUsage
.executing
=== 0 &&
1730 this.tasksQueueSize(workerNodeKey
) === 0 &&
1731 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1733 workerNode
.emit('idleWorkerNode', {
1734 workerId
: workerId
as number,
1742 private checkAndEmitTaskExecutionEvents (): void {
1744 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1748 private checkAndEmitTaskQueuingEvents (): void {
1749 if (this.hasBackPressure()) {
1750 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1754 private checkAndEmitDynamicWorkerCreationEvents (): void {
1755 if (this.type === PoolTypes
.dynamic
) {
1757 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1763 * Gets the worker information given its worker node key.
1765 * @param workerNodeKey - The worker node key.
1766 * @returns The worker information.
1768 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1769 return this.workerNodes
[workerNodeKey
]?.info
1773 * Creates a worker node.
1775 * @returns The created worker node.
1777 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1778 const workerNode
= new WorkerNode
<Worker
, Data
>(
1783 workerOptions
: this.opts
.workerOptions
,
1784 tasksQueueBackPressureSize
:
1785 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1788 // Flag the worker node as ready at pool startup.
1789 if (this.starting
) {
1790 workerNode
.info
.ready
= true
1796 * Adds the given worker node in the pool worker nodes.
1798 * @param workerNode - The worker node.
1799 * @returns The added worker node key.
1800 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1802 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1803 this.workerNodes
.push(workerNode
)
1804 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1805 if (workerNodeKey
=== -1) {
1806 throw new Error('Worker added not found in worker nodes')
1808 return workerNodeKey
1812 * Removes the worker node from the pool worker nodes.
1814 * @param workerNode - The worker node.
1816 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1817 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1818 if (workerNodeKey
!== -1) {
1819 this.workerNodes
.splice(workerNodeKey
, 1)
1820 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1824 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1825 this.getWorkerInfo(workerNodeKey
).ready
= false
1829 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1831 this.opts
.enableTasksQueue
=== true &&
1832 this.workerNodes
[workerNodeKey
].hasBackPressure()
1836 private hasBackPressure (): boolean {
1838 this.opts
.enableTasksQueue
=== true &&
1839 this.workerNodes
.findIndex(
1840 workerNode
=> !workerNode
.hasBackPressure()
1846 * Executes the given task on the worker given its worker node key.
1848 * @param workerNodeKey - The worker node key.
1849 * @param task - The task to execute.
1851 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1852 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1853 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1854 this.checkAndEmitTaskExecutionEvents()
1857 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1858 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1859 this.checkAndEmitTaskQueuingEvents()
1860 return tasksQueueSize
1863 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1864 return this.workerNodes
[workerNodeKey
].dequeueTask()
1867 private tasksQueueSize (workerNodeKey
: number): number {
1868 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1871 protected flushTasksQueue (workerNodeKey
: number): number {
1872 let flushedTasks
= 0
1873 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1876 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1880 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1884 private flushTasksQueues (): void {
1885 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1886 this.flushTasksQueue(workerNodeKey
)