1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 } from
'../utility-types.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
26 import { KillBehaviors
} from
'../worker/worker-options.js'
34 type TasksQueueOptions
38 WorkerChoiceStrategies
,
39 type WorkerChoiceStrategy
,
40 type WorkerChoiceStrategyOptions
41 } from
'./selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
45 checkValidTasksQueueOptions
,
46 checkValidWorkerChoiceStrategy
,
47 getDefaultTasksQueueOptions
,
49 updateRunTimeWorkerUsage
,
50 updateTaskStatisticsWorkerUsage
,
51 updateWaitTimeWorkerUsage
,
54 import { version
} from
'./version.js'
59 WorkerNodeEventDetail
,
62 import { WorkerNode
} from
'./worker-node.js'
65 * Base class that implements some shared logic for all poolifier pools.
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 export abstract class AbstractPool
<
72 Worker
extends IWorker
,
75 > implements IPool
<Worker
, Data
, Response
> {
77 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
80 public emitter
?: EventEmitterAsyncResource
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
90 new Map
<string, PromiseResponseWrapper
<Response
>>()
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
106 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
109 * Whether the pool is started or not.
111 private started
: boolean
113 * Whether the pool is starting or not.
115 private starting
: boolean
117 * Whether the pool is destroying or not.
119 private destroying
: boolean
121 * Whether the pool ready event has been emitted or not.
123 private readyEventEmitted
: boolean
125 * The start timestamp of the pool.
127 private readonly startTimestamp
130 * Constructs a new poolifier pool.
132 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
133 * @param filePath - Path to the worker file.
134 * @param opts - Options for the pool.
135 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
138 protected readonly minimumNumberOfWorkers
: number,
139 protected readonly filePath
: string,
140 protected readonly opts
: PoolOptions
<Worker
>,
141 protected readonly maximumNumberOfWorkers
?: number
143 if (!this.isMain()) {
145 'Cannot start a pool from a worker with the same type as the pool'
149 checkFilePath(this.filePath
)
150 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
151 this.checkPoolOptions(this.opts
)
153 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
154 this.executeTask
= this.executeTask
.bind(this)
155 this.enqueueTask
= this.enqueueTask
.bind(this)
157 if (this.opts
.enableEvents
=== true) {
158 this.initializeEventEmitter()
160 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
166 this.opts
.workerChoiceStrategy
,
167 this.opts
.workerChoiceStrategyOptions
172 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
175 this.starting
= false
176 this.destroying
= false
177 this.readyEventEmitted
= false
178 if (this.opts
.startWorkers
=== true) {
182 this.startTimestamp
= performance
.now()
185 private checkPoolType (): void {
186 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
188 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
193 private checkMinimumNumberOfWorkers (
194 minimumNumberOfWorkers
: number | undefined
196 if (minimumNumberOfWorkers
== null) {
198 'Cannot instantiate a pool without specifying the number of workers'
200 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
202 'Cannot instantiate a pool with a non safe integer number of workers'
204 } else if (minimumNumberOfWorkers
< 0) {
205 throw new RangeError(
206 'Cannot instantiate a pool with a negative number of workers'
208 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
209 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
213 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
214 if (isPlainObject(opts
)) {
215 this.opts
.startWorkers
= opts
.startWorkers
?? true
216 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
217 this.opts
.workerChoiceStrategy
=
218 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
219 this.checkValidWorkerChoiceStrategyOptions(
220 opts
.workerChoiceStrategyOptions
222 if (opts
.workerChoiceStrategyOptions
!= null) {
223 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
225 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
226 this.opts
.enableEvents
= opts
.enableEvents
?? true
227 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
228 if (this.opts
.enableTasksQueue
) {
229 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
230 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
231 opts
.tasksQueueOptions
235 throw new TypeError('Invalid pool options: must be a plain object')
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
243 workerChoiceStrategyOptions
!= null &&
244 !isPlainObject(workerChoiceStrategyOptions
)
247 'Invalid worker choice strategy options: must be a plain object'
251 workerChoiceStrategyOptions
?.weights
!= null &&
252 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
253 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
256 'Invalid worker choice strategy options: must have a weight for each worker node'
260 workerChoiceStrategyOptions
?.measurement
!= null &&
261 !Object.values(Measurements
).includes(
262 workerChoiceStrategyOptions
.measurement
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
271 private initializeEventEmitter (): void {
272 this.emitter
= new EventEmitterAsyncResource({
273 name
: `poolifier:${this.type}-${this.worker}-pool`
278 public get
info (): PoolInfo
{
283 started
: this.started
,
285 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
286 strategy
: this.opts
.workerChoiceStrategy
!,
287 strategyRetries
: this.workerChoiceStrategyContext
?.retriesCount
?? 0,
288 minSize
: this.minimumNumberOfWorkers
,
289 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
290 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
291 .runTime
.aggregate
=== true &&
292 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
293 .waitTime
.aggregate
&& {
294 utilization
: round(this.utilization
)
296 workerNodes
: this.workerNodes
.length
,
297 idleWorkerNodes
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 workerNode
.usage
.tasks
.executing
=== 0
304 ...(this.opts
.enableTasksQueue
=== true && {
305 stealingWorkerNodes
: this.workerNodes
.reduce(
306 (accumulator
, workerNode
) =>
307 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
311 busyWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, _workerNode
, workerNodeKey
) =>
313 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
316 executedTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ workerNode
.usage
.tasks
.executed
,
321 executingTasks
: this.workerNodes
.reduce(
322 (accumulator
, workerNode
) =>
323 accumulator
+ workerNode
.usage
.tasks
.executing
,
326 ...(this.opts
.enableTasksQueue
=== true && {
327 queuedTasks
: this.workerNodes
.reduce(
328 (accumulator
, workerNode
) =>
329 accumulator
+ workerNode
.usage
.tasks
.queued
,
333 ...(this.opts
.enableTasksQueue
=== true && {
334 maxQueuedTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
340 ...(this.opts
.enableTasksQueue
=== true && {
341 backPressure
: this.hasBackPressure()
343 ...(this.opts
.enableTasksQueue
=== true && {
344 stolenTasks
: this.workerNodes
.reduce(
345 (accumulator
, workerNode
) =>
346 accumulator
+ workerNode
.usage
.tasks
.stolen
,
350 failedTasks
: this.workerNodes
.reduce(
351 (accumulator
, workerNode
) =>
352 accumulator
+ workerNode
.usage
.tasks
.failed
,
355 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
356 .runTime
.aggregate
=== true && {
360 ...this.workerNodes
.map(
361 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
367 ...this.workerNodes
.map(
368 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
372 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
373 .runTime
.average
&& {
376 this.workerNodes
.reduce
<number[]>(
377 (accumulator
, workerNode
) =>
378 accumulator
.concat(workerNode
.usage
.runTime
.history
),
384 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
388 this.workerNodes
.reduce
<number[]>(
389 (accumulator
, workerNode
) =>
390 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
399 .waitTime
.aggregate
=== true && {
403 ...this.workerNodes
.map(
404 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
410 ...this.workerNodes
.map(
411 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
415 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
416 .waitTime
.average
&& {
419 this.workerNodes
.reduce
<number[]>(
420 (accumulator
, workerNode
) =>
421 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
427 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
428 .waitTime
.median
&& {
431 this.workerNodes
.reduce
<number[]>(
432 (accumulator
, workerNode
) =>
433 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
445 * The pool readiness boolean status.
447 private get
ready (): boolean {
452 this.workerNodes
.reduce(
453 (accumulator
, workerNode
) =>
454 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
458 ) >= this.minimumNumberOfWorkers
463 * The pool emptiness boolean status.
465 protected get
empty (): boolean {
466 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
470 * The approximate pool utilization.
472 * @returns The pool utilization.
474 private get
utilization (): number {
475 const poolTimeCapacity
=
476 (performance
.now() - this.startTimestamp
) *
477 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
478 const totalTasksRunTime
= this.workerNodes
.reduce(
479 (accumulator
, workerNode
) =>
480 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
483 const totalTasksWaitTime
= this.workerNodes
.reduce(
484 (accumulator
, workerNode
) =>
485 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
488 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
494 * If it is `'dynamic'`, it provides the `max` property.
496 protected abstract get
type (): PoolType
501 protected abstract get
worker (): WorkerType
504 * Checks if the worker id sent in the received message from a worker is valid.
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
509 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
510 if (message
.workerId
== null) {
511 throw new Error('Worker message received without worker id')
512 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
514 `Worker message received from unknown worker '${message.workerId}'`
520 * Gets the worker node key given its worker id.
522 * @param workerId - The worker id.
523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
525 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
526 return this.workerNodes
.findIndex(
527 workerNode
=> workerNode
.info
.id
=== workerId
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy
: WorkerChoiceStrategy
,
534 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
536 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
537 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
538 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
539 this.opts
.workerChoiceStrategy
541 if (workerChoiceStrategyOptions
!= null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
544 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
545 workerNode
.resetUsage()
546 this.sendStatisticsMessageToWorker(workerNodeKey
)
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 if (workerChoiceStrategyOptions
!= null) {
556 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
558 this.workerChoiceStrategyContext
?.setOptions(
559 this.opts
.workerChoiceStrategyOptions
564 public enableTasksQueue (
566 tasksQueueOptions
?: TasksQueueOptions
568 if (this.opts
.enableTasksQueue
=== true && !enable
) {
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
571 this.flushTasksQueues()
573 this.opts
.enableTasksQueue
= enable
574 this.setTasksQueueOptions(tasksQueueOptions
)
578 public setTasksQueueOptions (
579 tasksQueueOptions
: TasksQueueOptions
| undefined
581 if (this.opts
.enableTasksQueue
=== true) {
582 checkValidTasksQueueOptions(tasksQueueOptions
)
583 this.opts
.tasksQueueOptions
=
584 this.buildTasksQueueOptions(tasksQueueOptions
)
585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
586 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
587 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
588 this.unsetTaskStealing()
589 this.setTaskStealing()
591 this.unsetTaskStealing()
593 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
594 this.unsetTasksStealingOnBackPressure()
595 this.setTasksStealingOnBackPressure()
597 this.unsetTasksStealingOnBackPressure()
599 } else if (this.opts
.tasksQueueOptions
!= null) {
600 delete this.opts
.tasksQueueOptions
604 private buildTasksQueueOptions (
605 tasksQueueOptions
: TasksQueueOptions
| undefined
606 ): TasksQueueOptions
{
608 ...getDefaultTasksQueueOptions(
609 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
615 private setTasksQueueSize (size
: number): void {
616 for (const workerNode
of this.workerNodes
) {
617 workerNode
.tasksQueueBackPressureSize
= size
621 private setTaskStealing (): void {
622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
623 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 this.workerNodes
[workerNodeKey
].off(
631 this.handleWorkerNodeIdleEvent
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
638 this.workerNodes
[workerNodeKey
].on(
640 this.handleWorkerNodeBackPressureEvent
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
647 this.workerNodes
[workerNodeKey
].off(
649 this.handleWorkerNodeBackPressureEvent
655 * Whether the pool is full or not.
657 * The pool filling boolean status.
659 protected get
full (): boolean {
661 this.workerNodes
.length
>=
662 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
667 * Whether the pool is busy or not.
669 * The pool busyness boolean status.
671 protected abstract get
busy (): boolean
674 * Whether worker nodes are executing concurrently their tasks quota or not.
676 * @returns Worker nodes busyness boolean status.
678 protected internalBusy (): boolean {
679 if (this.opts
.enableTasksQueue
=== true) {
681 this.workerNodes
.findIndex(
683 workerNode
.info
.ready
&&
684 workerNode
.usage
.tasks
.executing
<
685 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
686 this.opts
.tasksQueueOptions
!.concurrency
!
691 this.workerNodes
.findIndex(
693 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
698 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
699 if (this.opts
.enableTasksQueue
=== true) {
701 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
703 this.opts
.tasksQueueOptions
!.concurrency
!
706 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey
: number,
711 message
: MessageValue
<Data
>
712 ): Promise
<boolean> {
713 return await new Promise
<boolean>((resolve
, reject
) => {
714 const taskFunctionOperationListener
= (
715 message
: MessageValue
<Response
>
717 this.checkMessageWorkerId(message
)
718 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
720 message
.taskFunctionOperationStatus
!= null &&
721 message
.workerId
=== workerId
723 if (message
.taskFunctionOperationStatus
) {
728 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
732 this.deregisterWorkerMessageListener(
733 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
734 taskFunctionOperationListener
738 this.registerWorkerMessageListener(
740 taskFunctionOperationListener
742 this.sendToWorker(workerNodeKey
, message
)
746 private async sendTaskFunctionOperationToWorkers (
747 message
: MessageValue
<Data
>
748 ): Promise
<boolean> {
749 return await new Promise
<boolean>((resolve
, reject
) => {
750 const responsesReceived
= new Array<MessageValue
<Response
>>()
751 const taskFunctionOperationsListener
= (
752 message
: MessageValue
<Response
>
754 this.checkMessageWorkerId(message
)
755 if (message
.taskFunctionOperationStatus
!= null) {
756 responsesReceived
.push(message
)
757 if (responsesReceived
.length
=== this.workerNodes
.length
) {
759 responsesReceived
.every(
760 message
=> message
.taskFunctionOperationStatus
=== true
765 responsesReceived
.some(
766 message
=> message
.taskFunctionOperationStatus
=== false
769 const errorResponse
= responsesReceived
.find(
770 response
=> response
.taskFunctionOperationStatus
=== false
774 `Task function operation '${
775 message.taskFunctionOperation as string
776 }' failed on worker ${errorResponse?.workerId} with error: '${
777 errorResponse?.workerError?.message
782 this.deregisterWorkerMessageListener(
783 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
784 taskFunctionOperationsListener
789 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
790 this.registerWorkerMessageListener(
792 taskFunctionOperationsListener
794 this.sendToWorker(workerNodeKey
, message
)
800 public hasTaskFunction (name
: string): boolean {
801 for (const workerNode
of this.workerNodes
) {
803 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
804 workerNode
.info
.taskFunctionNames
.includes(name
)
813 public async addTaskFunction (
815 fn
: TaskFunction
<Data
, Response
>
816 ): Promise
<boolean> {
817 if (typeof name
!== 'string') {
818 throw new TypeError('name argument must be a string')
820 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
821 throw new TypeError('name argument must not be an empty string')
823 if (typeof fn
!== 'function') {
824 throw new TypeError('fn argument must be a function')
826 const opResult
= await this.sendTaskFunctionOperationToWorkers({
827 taskFunctionOperation
: 'add',
828 taskFunctionName
: name
,
829 taskFunction
: fn
.toString()
831 this.taskFunctions
.set(name
, fn
)
836 public async removeTaskFunction (name
: string): Promise
<boolean> {
837 if (!this.taskFunctions
.has(name
)) {
839 'Cannot remove a task function not handled on the pool side'
842 const opResult
= await this.sendTaskFunctionOperationToWorkers({
843 taskFunctionOperation
: 'remove',
844 taskFunctionName
: name
846 this.deleteTaskFunctionWorkerUsages(name
)
847 this.taskFunctions
.delete(name
)
852 public listTaskFunctionNames (): string[] {
853 for (const workerNode
of this.workerNodes
) {
855 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
856 workerNode
.info
.taskFunctionNames
.length
> 0
858 return workerNode
.info
.taskFunctionNames
865 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
866 return await this.sendTaskFunctionOperationToWorkers({
867 taskFunctionOperation
: 'default',
868 taskFunctionName
: name
872 private deleteTaskFunctionWorkerUsages (name
: string): void {
873 for (const workerNode
of this.workerNodes
) {
874 workerNode
.deleteTaskFunctionWorkerUsage(name
)
878 private shallExecuteTask (workerNodeKey
: number): boolean {
880 this.tasksQueueSize(workerNodeKey
) === 0 &&
881 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
882 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
883 this.opts
.tasksQueueOptions
!.concurrency
!
888 public async execute (
891 transferList
?: TransferListItem
[]
892 ): Promise
<Response
> {
893 return await new Promise
<Response
>((resolve
, reject
) => {
895 reject(new Error('Cannot execute a task on not started pool'))
898 if (this.destroying
) {
899 reject(new Error('Cannot execute a task on destroying pool'))
902 if (name
!= null && typeof name
!== 'string') {
903 reject(new TypeError('name argument must be a string'))
908 typeof name
=== 'string' &&
909 name
.trim().length
=== 0
911 reject(new TypeError('name argument must not be an empty string'))
914 if (transferList
!= null && !Array.isArray(transferList
)) {
915 reject(new TypeError('transferList argument must be an array'))
918 const timestamp
= performance
.now()
919 const workerNodeKey
= this.chooseWorkerNode()
920 const task
: Task
<Data
> = {
921 name
: name
?? DEFAULT_TASK_NAME
,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data
: data
?? ({} as Data
),
928 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
929 this.promiseResponseMap
.set(task
.taskId
!, {
933 ...(this.emitter
!= null && {
934 asyncResource
: new AsyncResource('poolifier:task', {
935 triggerAsyncId
: this.emitter
.asyncId
,
936 requireManualDestroy
: true
941 this.opts
.enableTasksQueue
=== false ||
942 (this.opts
.enableTasksQueue
=== true &&
943 this.shallExecuteTask(workerNodeKey
))
945 this.executeTask(workerNodeKey
, task
)
947 this.enqueueTask(workerNodeKey
, task
)
953 public start (): void {
955 throw new Error('Cannot start an already started pool')
958 throw new Error('Cannot start an already starting pool')
960 if (this.destroying
) {
961 throw new Error('Cannot start a destroying pool')
965 this.workerNodes
.reduce(
966 (accumulator
, workerNode
) =>
967 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
969 ) < this.minimumNumberOfWorkers
971 this.createAndSetupWorkerNode()
973 this.starting
= false
978 public async destroy (): Promise
<void> {
980 throw new Error('Cannot destroy an already destroyed pool')
983 throw new Error('Cannot destroy an starting pool')
985 if (this.destroying
) {
986 throw new Error('Cannot destroy an already destroying pool')
988 this.destroying
= true
990 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
991 await this.destroyWorkerNode(workerNodeKey
)
994 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
995 this.emitter
?.emitDestroy()
996 this.emitter
?.removeAllListeners()
997 this.readyEventEmitted
= false
998 this.destroying
= false
1002 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1003 await new Promise
<void>((resolve
, reject
) => {
1004 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1005 if (this.workerNodes
[workerNodeKey
] == null) {
1009 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1010 this.checkMessageWorkerId(message
)
1011 if (message
.kill
=== 'success') {
1013 } else if (message
.kill
=== 'failure') {
1016 `Kill message handling failed on worker ${message.workerId}`
1021 // FIXME: should be registered only once
1022 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1023 this.sendToWorker(workerNodeKey
, { kill
: true })
1028 * Terminates the worker node given its worker node key.
1030 * @param workerNodeKey - The worker node key.
1032 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1033 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1034 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1035 const workerNode
= this.workerNodes
[workerNodeKey
]
1036 await waitWorkerNodeEvents(
1040 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1041 getDefaultTasksQueueOptions(
1042 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1043 ).tasksFinishedTimeout
1045 await this.sendKillMessageToWorker(workerNodeKey
)
1046 await workerNode
.terminate()
1050 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1051 * Can be overridden.
1055 protected setupHook (): void {
1056 /* Intentionally empty */
1060 * Should return whether the worker is the main worker or not.
1062 protected abstract isMain (): boolean
1065 * Hook executed before the worker task execution.
1066 * Can be overridden.
1068 * @param workerNodeKey - The worker node key.
1069 * @param task - The task to execute.
1071 protected beforeTaskExecutionHook (
1072 workerNodeKey
: number,
1075 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1076 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1077 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1078 ++workerUsage
.tasks
.executing
1079 updateWaitTimeWorkerUsage(
1080 this.workerChoiceStrategyContext
,
1086 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1087 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1088 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1091 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1092 const taskFunctionWorkerUsage
= this.workerNodes
[
1094 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1095 ].getTaskFunctionWorkerUsage(task
.name
!)!
1096 ++taskFunctionWorkerUsage
.tasks
.executing
1097 updateWaitTimeWorkerUsage(
1098 this.workerChoiceStrategyContext
,
1099 taskFunctionWorkerUsage
,
1106 * Hook executed after the worker task execution.
1107 * Can be overridden.
1109 * @param workerNodeKey - The worker node key.
1110 * @param message - The received message.
1112 protected afterTaskExecutionHook (
1113 workerNodeKey
: number,
1114 message
: MessageValue
<Response
>
1116 let needWorkerChoiceStrategyUpdate
= false
1117 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1118 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1119 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1120 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1121 updateRunTimeWorkerUsage(
1122 this.workerChoiceStrategyContext
,
1126 updateEluWorkerUsage(
1127 this.workerChoiceStrategyContext
,
1131 needWorkerChoiceStrategyUpdate
= true
1134 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1135 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1136 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1137 message
.taskPerformance
!.name
1140 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1141 const taskFunctionWorkerUsage
= this.workerNodes
[
1143 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1144 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1145 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1146 updateRunTimeWorkerUsage(
1147 this.workerChoiceStrategyContext
,
1148 taskFunctionWorkerUsage
,
1151 updateEluWorkerUsage(
1152 this.workerChoiceStrategyContext
,
1153 taskFunctionWorkerUsage
,
1156 needWorkerChoiceStrategyUpdate
= true
1158 if (needWorkerChoiceStrategyUpdate
) {
1159 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1164 * Whether the worker node shall update its task function worker usage or not.
1166 * @param workerNodeKey - The worker node key.
1167 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1169 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1170 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1172 workerInfo
!= null &&
1173 Array.isArray(workerInfo
.taskFunctionNames
) &&
1174 workerInfo
.taskFunctionNames
.length
> 2
1179 * Chooses a worker node for the next task.
1181 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1183 * @returns The chosen worker node key
1185 private chooseWorkerNode (): number {
1186 if (this.shallCreateDynamicWorker()) {
1187 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1189 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1190 .dynamicWorkerUsage
=== true
1192 return workerNodeKey
1195 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1196 return this.workerChoiceStrategyContext
!.execute()
1200 * Conditions for dynamic worker creation.
1202 * @returns Whether to create a dynamic worker or not.
1204 protected abstract shallCreateDynamicWorker (): boolean
1207 * Sends a message to worker given its worker node key.
1209 * @param workerNodeKey - The worker node key.
1210 * @param message - The message.
1211 * @param transferList - The optional array of transferable objects.
1213 protected abstract sendToWorker (
1214 workerNodeKey
: number,
1215 message
: MessageValue
<Data
>,
1216 transferList
?: TransferListItem
[]
1220 * Creates a new, completely set up worker node.
1222 * @returns New, completely set up worker node key.
1224 protected createAndSetupWorkerNode (): number {
1225 const workerNode
= this.createWorkerNode()
1226 workerNode
.registerWorkerEventHandler(
1228 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1230 workerNode
.registerWorkerEventHandler(
1232 this.opts
.messageHandler
?? EMPTY_FUNCTION
1234 workerNode
.registerWorkerEventHandler(
1236 this.opts
.errorHandler
?? EMPTY_FUNCTION
1238 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1239 workerNode
.info
.ready
= false
1240 this.emitter
?.emit(PoolEvents
.error
, error
)
1244 this.opts
.restartWorkerOnError
=== true
1246 if (workerNode
.info
.dynamic
) {
1247 this.createAndSetupDynamicWorkerNode()
1249 this.createAndSetupWorkerNode()
1255 this.opts
.enableTasksQueue
=== true
1257 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1259 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1260 workerNode
?.terminate().catch(error
=> {
1261 this.emitter
?.emit(PoolEvents
.error
, error
)
1264 workerNode
.registerWorkerEventHandler(
1266 this.opts
.exitHandler
?? EMPTY_FUNCTION
1268 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1269 this.removeWorkerNode(workerNode
)
1271 const workerNodeKey
= this.addWorkerNode(workerNode
)
1272 this.afterWorkerNodeSetup(workerNodeKey
)
1273 return workerNodeKey
1277 * Creates a new, completely set up dynamic worker node.
1279 * @returns New, completely set up dynamic worker node key.
1281 protected createAndSetupDynamicWorkerNode (): number {
1282 const workerNodeKey
= this.createAndSetupWorkerNode()
1283 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1284 this.checkMessageWorkerId(message
)
1285 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1288 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1289 // Kill message received from worker
1291 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1292 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1293 ((this.opts
.enableTasksQueue
=== false &&
1294 workerUsage
.tasks
.executing
=== 0) ||
1295 (this.opts
.enableTasksQueue
=== true &&
1296 workerUsage
.tasks
.executing
=== 0 &&
1297 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1299 // Flag the worker node as not ready immediately
1300 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1301 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1302 this.emitter
?.emit(PoolEvents
.error
, error
)
1306 this.sendToWorker(workerNodeKey
, {
1309 if (this.taskFunctions
.size
> 0) {
1310 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1311 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1312 taskFunctionOperation
: 'add',
1314 taskFunction
: taskFunction
.toString()
1316 this.emitter
?.emit(PoolEvents
.error
, error
)
1320 const workerNode
= this.workerNodes
[workerNodeKey
]
1321 workerNode
.info
.dynamic
= true
1323 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1324 .dynamicWorkerReady
=== true ||
1325 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1326 .dynamicWorkerUsage
=== true
1328 workerNode
.info
.ready
= true
1330 this.checkAndEmitDynamicWorkerCreationEvents()
1331 return workerNodeKey
1335 * Registers a listener callback on the worker given its worker node key.
1337 * @param workerNodeKey - The worker node key.
1338 * @param listener - The message listener callback.
1340 protected abstract registerWorkerMessageListener
<
1341 Message
extends Data
| Response
1343 workerNodeKey
: number,
1344 listener
: (message
: MessageValue
<Message
>) => void
1348 * Registers once a listener callback on the worker given its worker node key.
1350 * @param workerNodeKey - The worker node key.
1351 * @param listener - The message listener callback.
1353 protected abstract registerOnceWorkerMessageListener
<
1354 Message
extends Data
| Response
1356 workerNodeKey
: number,
1357 listener
: (message
: MessageValue
<Message
>) => void
1361 * Deregisters a listener callback on the worker given its worker node key.
1363 * @param workerNodeKey - The worker node key.
1364 * @param listener - The message listener callback.
1366 protected abstract deregisterWorkerMessageListener
<
1367 Message
extends Data
| Response
1369 workerNodeKey
: number,
1370 listener
: (message
: MessageValue
<Message
>) => void
1374 * Method hooked up after a worker node has been newly created.
1375 * Can be overridden.
1377 * @param workerNodeKey - The newly created worker node key.
1379 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1380 // Listen to worker messages.
1381 this.registerWorkerMessageListener(
1383 this.workerMessageListener
1385 // Send the startup message to worker.
1386 this.sendStartupMessageToWorker(workerNodeKey
)
1387 // Send the statistics message to worker.
1388 this.sendStatisticsMessageToWorker(workerNodeKey
)
1389 if (this.opts
.enableTasksQueue
=== true) {
1390 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1391 this.workerNodes
[workerNodeKey
].on(
1393 this.handleWorkerNodeIdleEvent
1396 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1397 this.workerNodes
[workerNodeKey
].on(
1399 this.handleWorkerNodeBackPressureEvent
1406 * Sends the startup message to worker given its worker node key.
1408 * @param workerNodeKey - The worker node key.
1410 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1413 * Sends the statistics message to worker given its worker node key.
1415 * @param workerNodeKey - The worker node key.
1417 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1418 this.sendToWorker(workerNodeKey
, {
1421 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1422 .runTime
.aggregate
?? false,
1424 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
1430 private cannotStealTask (): boolean {
1431 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1434 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1435 if (this.shallExecuteTask(workerNodeKey
)) {
1436 this.executeTask(workerNodeKey
, task
)
1438 this.enqueueTask(workerNodeKey
, task
)
1442 private redistributeQueuedTasks (workerNodeKey
: number): void {
1443 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1446 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1447 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1448 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1449 return workerNode
.info
.ready
&&
1450 workerNode
.usage
.tasks
.queued
<
1451 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1458 destinationWorkerNodeKey
,
1459 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1460 this.dequeueTask(workerNodeKey
)!
1465 private updateTaskStolenStatisticsWorkerUsage (
1466 workerNodeKey
: number,
1469 const workerNode
= this.workerNodes
[workerNodeKey
]
1470 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1471 if (workerNode
?.usage
!= null) {
1472 ++workerNode
.usage
.tasks
.stolen
1475 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1476 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1478 const taskFunctionWorkerUsage
=
1479 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1480 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1481 ++taskFunctionWorkerUsage
.tasks
.stolen
1485 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1486 workerNodeKey
: number
1488 const workerNode
= this.workerNodes
[workerNodeKey
]
1489 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1490 if (workerNode
?.usage
!= null) {
1491 ++workerNode
.usage
.tasks
.sequentiallyStolen
1495 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1496 workerNodeKey
: number,
1499 const workerNode
= this.workerNodes
[workerNodeKey
]
1501 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1502 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1504 const taskFunctionWorkerUsage
=
1505 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1506 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1507 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1511 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1512 workerNodeKey
: number
1514 const workerNode
= this.workerNodes
[workerNodeKey
]
1515 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1516 if (workerNode
?.usage
!= null) {
1517 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1521 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1522 workerNodeKey
: number,
1525 const workerNode
= this.workerNodes
[workerNodeKey
]
1527 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1528 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1530 const taskFunctionWorkerUsage
=
1531 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1532 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1533 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1537 private readonly handleWorkerNodeIdleEvent
= (
1538 eventDetail
: WorkerNodeEventDetail
,
1539 previousStolenTask
?: Task
<Data
>
1541 const { workerNodeKey
} = eventDetail
1542 if (workerNodeKey
== null) {
1544 "WorkerNode event detail 'workerNodeKey' property must be defined"
1547 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1549 this.cannotStealTask() ||
1550 (this.info
.stealingWorkerNodes
?? 0) >
1551 Math.floor(this.workerNodes
.length
/ 2)
1553 if (workerInfo
!= null && previousStolenTask
!= null) {
1554 workerInfo
.stealing
= false
1558 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1560 workerInfo
!= null &&
1561 previousStolenTask
!= null &&
1562 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1563 (workerNodeTasksUsage
.executing
> 0 ||
1564 this.tasksQueueSize(workerNodeKey
) > 0)
1566 workerInfo
.stealing
= false
1567 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1568 for (const taskName
of workerInfo
.taskFunctionNames
!) {
1569 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1574 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1577 if (workerInfo
== null) {
1579 `Worker node with key '${workerNodeKey}' not found in pool`
1582 workerInfo
.stealing
= true
1583 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1585 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1589 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1594 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1595 (previousStolenTask
!= null &&
1596 previousStolenTask
.name
=== stolenTask
.name
&&
1597 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1599 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1601 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1605 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1607 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1612 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1614 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1618 this.emitter
?.emit(PoolEvents
.error
, error
)
1622 private readonly workerNodeStealTask
= (
1623 workerNodeKey
: number
1624 ): Task
<Data
> | undefined => {
1625 const workerNodes
= this.workerNodes
1628 (workerNodeA
, workerNodeB
) =>
1629 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1631 const sourceWorkerNode
= workerNodes
.find(
1632 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1633 sourceWorkerNode
.info
.ready
&&
1634 !sourceWorkerNode
.info
.stealing
&&
1635 sourceWorkerNodeKey
!== workerNodeKey
&&
1636 sourceWorkerNode
.usage
.tasks
.queued
> 0
1638 if (sourceWorkerNode
!= null) {
1639 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1640 const task
= sourceWorkerNode
.popTask()!
1641 this.handleTask(workerNodeKey
, task
)
1642 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1643 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1644 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1649 private readonly handleWorkerNodeBackPressureEvent
= (
1650 eventDetail
: WorkerNodeEventDetail
1653 this.cannotStealTask() ||
1654 (this.info
.stealingWorkerNodes
?? 0) >
1655 Math.floor(this.workerNodes
.length
/ 2)
1659 const { workerId
} = eventDetail
1660 const sizeOffset
= 1
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1665 const sourceWorkerNode
=
1666 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1667 const workerNodes
= this.workerNodes
1670 (workerNodeA
, workerNodeB
) =>
1671 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1673 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1675 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1676 workerNode
.info
.ready
&&
1677 !workerNode
.info
.stealing
&&
1678 workerNode
.info
.id
!== workerId
&&
1679 workerNode
.usage
.tasks
.queued
<
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1683 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1684 if (workerInfo
== null) {
1686 `Worker node with key '${workerNodeKey}' not found in pool`
1689 workerInfo
.stealing
= true
1690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1691 const task
= sourceWorkerNode
.popTask()!
1692 this.handleTask(workerNodeKey
, task
)
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1695 workerInfo
.stealing
= false
1701 * This method is the message listener registered on each worker.
1703 protected readonly workerMessageListener
= (
1704 message
: MessageValue
<Response
>
1706 this.checkMessageWorkerId(message
)
1707 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1708 if (ready
!= null && taskFunctionNames
!= null) {
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message
)
1711 } else if (taskId
!= null) {
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message
)
1714 } else if (taskFunctionNames
!= null) {
1715 // Task function names message received from worker
1716 const workerInfo
= this.getWorkerInfo(
1717 this.getWorkerNodeKeyByWorkerId(workerId
)
1719 if (workerInfo
!= null) {
1720 workerInfo
.taskFunctionNames
= taskFunctionNames
1725 private checkAndEmitReadyEvent (): void {
1726 if (!this.readyEventEmitted
&& this.ready
) {
1727 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1728 this.readyEventEmitted
= true
1732 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1733 const { workerId
, ready
, taskFunctionNames
} = message
1734 if (ready
== null || !ready
) {
1735 throw new Error(`Worker ${workerId} failed to initialize`)
1738 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1739 workerNode
.info
.ready
= ready
1740 workerNode
.info
.taskFunctionNames
= taskFunctionNames
1741 this.checkAndEmitReadyEvent()
1744 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1745 const { workerId
, taskId
, workerError
, data
} = message
1746 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1747 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1748 if (promiseResponse
!= null) {
1749 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1750 const workerNode
= this.workerNodes
[workerNodeKey
]
1751 if (workerError
!= null) {
1752 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1753 asyncResource
!= null
1754 ? asyncResource
.runInAsyncScope(
1759 : reject(workerError
.message
)
1761 asyncResource
!= null
1762 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1763 : resolve(data
as Response
)
1765 asyncResource
?.emitDestroy()
1766 this.afterTaskExecutionHook(workerNodeKey
, message
)
1767 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1768 this.promiseResponseMap
.delete(taskId
!)
1769 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1770 workerNode
?.emit('taskFinished', taskId
)
1772 this.opts
.enableTasksQueue
=== true &&
1774 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1777 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1779 this.tasksQueueSize(workerNodeKey
) > 0 &&
1780 workerNodeTasksUsage
.executing
<
1781 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1782 this.opts
.tasksQueueOptions
!.concurrency
!
1784 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1785 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1788 workerNodeTasksUsage
.executing
=== 0 &&
1789 this.tasksQueueSize(workerNodeKey
) === 0 &&
1790 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1792 workerNode
.emit('idle', {
1801 private checkAndEmitTaskExecutionEvents (): void {
1803 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1807 private checkAndEmitTaskQueuingEvents (): void {
1808 if (this.hasBackPressure()) {
1809 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1814 * Emits dynamic worker creation events.
1816 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1819 * Gets the worker information given its worker node key.
1821 * @param workerNodeKey - The worker node key.
1822 * @returns The worker information.
1824 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1825 return this.workerNodes
[workerNodeKey
]?.info
1829 * Creates a worker node.
1831 * @returns The created worker node.
1833 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1834 const workerNode
= new WorkerNode
<Worker
, Data
>(
1839 workerOptions
: this.opts
.workerOptions
,
1840 tasksQueueBackPressureSize
:
1841 this.opts
.tasksQueueOptions
?.size
??
1842 getDefaultTasksQueueOptions(
1843 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1847 // Flag the worker node as ready at pool startup.
1848 if (this.starting
) {
1849 workerNode
.info
.ready
= true
1855 * Adds the given worker node in the pool worker nodes.
1857 * @param workerNode - The worker node.
1858 * @returns The added worker node key.
1859 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1861 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1862 this.workerNodes
.push(workerNode
)
1863 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1864 if (workerNodeKey
=== -1) {
1865 throw new Error('Worker added not found in worker nodes')
1867 return workerNodeKey
1870 private checkAndEmitEmptyEvent (): void {
1872 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1873 this.readyEventEmitted
= false
1878 * Removes the worker node from the pool worker nodes.
1880 * @param workerNode - The worker node.
1882 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1883 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1884 if (workerNodeKey
!== -1) {
1885 this.workerNodes
.splice(workerNodeKey
, 1)
1886 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1888 this.checkAndEmitEmptyEvent()
1891 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1892 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1893 if (workerInfo
!= null) {
1894 workerInfo
.ready
= false
1898 private hasBackPressure (): boolean {
1900 this.opts
.enableTasksQueue
=== true &&
1901 this.workerNodes
.findIndex(
1902 workerNode
=> !workerNode
.hasBackPressure()
1908 * Executes the given task on the worker given its worker node key.
1910 * @param workerNodeKey - The worker node key.
1911 * @param task - The task to execute.
1913 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1914 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1915 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1916 this.checkAndEmitTaskExecutionEvents()
1919 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1920 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1921 this.checkAndEmitTaskQueuingEvents()
1922 return tasksQueueSize
1925 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1926 return this.workerNodes
[workerNodeKey
].dequeueTask()
1929 private tasksQueueSize (workerNodeKey
: number): number {
1930 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1933 protected flushTasksQueue (workerNodeKey
: number): number {
1934 let flushedTasks
= 0
1935 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1936 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1937 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1940 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1944 private flushTasksQueues (): void {
1945 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1946 this.flushTasksQueue(workerNodeKey
)