1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 } from
'../utility-types.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
26 import { KillBehaviors
} from
'../worker/worker-options.js'
34 type TasksQueueOptions
38 WorkerChoiceStrategies
,
39 type WorkerChoiceStrategy
,
40 type WorkerChoiceStrategyOptions
41 } from
'./selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
45 checkValidTasksQueueOptions
,
46 checkValidWorkerChoiceStrategy
,
47 getDefaultTasksQueueOptions
,
49 updateRunTimeWorkerUsage
,
50 updateTaskStatisticsWorkerUsage
,
51 updateWaitTimeWorkerUsage
,
54 import { version
} from
'./version.js'
59 WorkerNodeEventDetail
,
62 import { WorkerNode
} from
'./worker-node.js'
65 * Base class that implements some shared logic for all poolifier pools.
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 export abstract class AbstractPool
<
72 Worker
extends IWorker
,
75 > implements IPool
<Worker
, Data
, Response
> {
77 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
80 public emitter
?: EventEmitterAsyncResource
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
90 new Map
<string, PromiseResponseWrapper
<Response
>>()
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
106 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
109 * Whether the pool is started or not.
111 private started
: boolean
113 * Whether the pool is starting or not.
115 private starting
: boolean
117 * Whether the pool is destroying or not.
119 private destroying
: boolean
121 * Whether the pool ready event has been emitted or not.
123 private readyEventEmitted
: boolean
125 * The start timestamp of the pool.
127 private readonly startTimestamp
130 * Constructs a new poolifier pool.
132 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
133 * @param filePath - Path to the worker file.
134 * @param opts - Options for the pool.
135 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
138 protected readonly minimumNumberOfWorkers
: number,
139 protected readonly filePath
: string,
140 protected readonly opts
: PoolOptions
<Worker
>,
141 protected readonly maximumNumberOfWorkers
?: number
143 if (!this.isMain()) {
145 'Cannot start a pool from a worker with the same type as the pool'
149 checkFilePath(this.filePath
)
150 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
151 this.checkPoolOptions(this.opts
)
153 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
154 this.executeTask
= this.executeTask
.bind(this)
155 this.enqueueTask
= this.enqueueTask
.bind(this)
157 if (this.opts
.enableEvents
=== true) {
158 this.initializeEventEmitter()
160 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
166 this.opts
.workerChoiceStrategy
,
167 this.opts
.workerChoiceStrategyOptions
172 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
175 this.starting
= false
176 this.destroying
= false
177 this.readyEventEmitted
= false
178 if (this.opts
.startWorkers
=== true) {
182 this.startTimestamp
= performance
.now()
185 private checkPoolType (): void {
186 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
188 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
193 private checkMinimumNumberOfWorkers (
194 minimumNumberOfWorkers
: number | undefined
196 if (minimumNumberOfWorkers
== null) {
198 'Cannot instantiate a pool without specifying the number of workers'
200 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
202 'Cannot instantiate a pool with a non safe integer number of workers'
204 } else if (minimumNumberOfWorkers
< 0) {
205 throw new RangeError(
206 'Cannot instantiate a pool with a negative number of workers'
208 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
209 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
213 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
214 if (isPlainObject(opts
)) {
215 this.opts
.startWorkers
= opts
.startWorkers
?? true
216 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
217 this.opts
.workerChoiceStrategy
=
218 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
219 this.checkValidWorkerChoiceStrategyOptions(
220 opts
.workerChoiceStrategyOptions
222 if (opts
.workerChoiceStrategyOptions
!= null) {
223 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
225 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
226 this.opts
.enableEvents
= opts
.enableEvents
?? true
227 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
228 if (this.opts
.enableTasksQueue
) {
229 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
230 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
231 opts
.tasksQueueOptions
235 throw new TypeError('Invalid pool options: must be a plain object')
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
243 workerChoiceStrategyOptions
!= null &&
244 !isPlainObject(workerChoiceStrategyOptions
)
247 'Invalid worker choice strategy options: must be a plain object'
251 workerChoiceStrategyOptions
?.weights
!= null &&
252 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
253 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
256 'Invalid worker choice strategy options: must have a weight for each worker node'
260 workerChoiceStrategyOptions
?.measurement
!= null &&
261 !Object.values(Measurements
).includes(
262 workerChoiceStrategyOptions
.measurement
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
271 private initializeEventEmitter (): void {
272 this.emitter
= new EventEmitterAsyncResource({
273 name
: `poolifier:${this.type}-${this.worker}-pool`
278 public get
info (): PoolInfo
{
283 started
: this.started
,
285 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
286 strategy
: this.opts
.workerChoiceStrategy
!,
287 strategyRetries
: this.workerChoiceStrategyContext
?.retriesCount
?? 0,
288 minSize
: this.minimumNumberOfWorkers
,
289 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
290 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
291 .runTime
.aggregate
=== true &&
292 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
293 .waitTime
.aggregate
&& {
294 utilization
: round(this.utilization
)
296 workerNodes
: this.workerNodes
.length
,
297 idleWorkerNodes
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 workerNode
.usage
.tasks
.executing
=== 0
304 ...(this.opts
.enableTasksQueue
=== true && {
305 stealingWorkerNodes
: this.workerNodes
.reduce(
306 (accumulator
, workerNode
) =>
307 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
311 busyWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, _workerNode
, workerNodeKey
) =>
313 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
316 executedTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ workerNode
.usage
.tasks
.executed
,
321 executingTasks
: this.workerNodes
.reduce(
322 (accumulator
, workerNode
) =>
323 accumulator
+ workerNode
.usage
.tasks
.executing
,
326 ...(this.opts
.enableTasksQueue
=== true && {
327 queuedTasks
: this.workerNodes
.reduce(
328 (accumulator
, workerNode
) =>
329 accumulator
+ workerNode
.usage
.tasks
.queued
,
333 ...(this.opts
.enableTasksQueue
=== true && {
334 maxQueuedTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
340 ...(this.opts
.enableTasksQueue
=== true && {
341 backPressure
: this.hasBackPressure()
343 ...(this.opts
.enableTasksQueue
=== true && {
344 stolenTasks
: this.workerNodes
.reduce(
345 (accumulator
, workerNode
) =>
346 accumulator
+ workerNode
.usage
.tasks
.stolen
,
350 failedTasks
: this.workerNodes
.reduce(
351 (accumulator
, workerNode
) =>
352 accumulator
+ workerNode
.usage
.tasks
.failed
,
355 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
356 .runTime
.aggregate
=== true && {
360 ...this.workerNodes
.map(
361 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
367 ...this.workerNodes
.map(
368 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
372 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
373 .runTime
.average
&& {
376 this.workerNodes
.reduce
<number[]>(
377 (accumulator
, workerNode
) =>
378 accumulator
.concat(workerNode
.usage
.runTime
.history
),
384 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
388 this.workerNodes
.reduce
<number[]>(
389 (accumulator
, workerNode
) =>
390 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
399 .waitTime
.aggregate
=== true && {
403 ...this.workerNodes
.map(
404 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
410 ...this.workerNodes
.map(
411 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
415 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
416 .waitTime
.average
&& {
419 this.workerNodes
.reduce
<number[]>(
420 (accumulator
, workerNode
) =>
421 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
427 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
428 .waitTime
.median
&& {
431 this.workerNodes
.reduce
<number[]>(
432 (accumulator
, workerNode
) =>
433 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
445 * The pool readiness boolean status.
447 private get
ready (): boolean {
452 this.workerNodes
.reduce(
453 (accumulator
, workerNode
) =>
454 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
458 ) >= this.minimumNumberOfWorkers
463 * The pool emptiness boolean status.
465 protected get
empty (): boolean {
466 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
470 * The approximate pool utilization.
472 * @returns The pool utilization.
474 private get
utilization (): number {
475 const poolTimeCapacity
=
476 (performance
.now() - this.startTimestamp
) *
477 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
478 const totalTasksRunTime
= this.workerNodes
.reduce(
479 (accumulator
, workerNode
) =>
480 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
483 const totalTasksWaitTime
= this.workerNodes
.reduce(
484 (accumulator
, workerNode
) =>
485 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
488 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
494 * If it is `'dynamic'`, it provides the `max` property.
496 protected abstract get
type (): PoolType
501 protected abstract get
worker (): WorkerType
504 * Checks if the worker id sent in the received message from a worker is valid.
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
509 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
510 if (message
.workerId
== null) {
511 throw new Error('Worker message received without worker id')
512 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
514 `Worker message received from unknown worker '${message.workerId}'`
520 * Gets the worker node key given its worker id.
522 * @param workerId - The worker id.
523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
525 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
526 return this.workerNodes
.findIndex(
527 workerNode
=> workerNode
.info
.id
=== workerId
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy
: WorkerChoiceStrategy
,
534 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
536 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
537 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
538 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
539 this.opts
.workerChoiceStrategy
541 if (workerChoiceStrategyOptions
!= null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
544 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
545 workerNode
.resetUsage()
546 this.sendStatisticsMessageToWorker(workerNodeKey
)
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 if (workerChoiceStrategyOptions
!= null) {
556 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
558 this.workerChoiceStrategyContext
?.setOptions(
559 this.opts
.workerChoiceStrategyOptions
564 public enableTasksQueue (
566 tasksQueueOptions
?: TasksQueueOptions
568 if (this.opts
.enableTasksQueue
=== true && !enable
) {
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
571 this.flushTasksQueues()
573 this.opts
.enableTasksQueue
= enable
574 this.setTasksQueueOptions(tasksQueueOptions
)
578 public setTasksQueueOptions (
579 tasksQueueOptions
: TasksQueueOptions
| undefined
581 if (this.opts
.enableTasksQueue
=== true) {
582 checkValidTasksQueueOptions(tasksQueueOptions
)
583 this.opts
.tasksQueueOptions
=
584 this.buildTasksQueueOptions(tasksQueueOptions
)
585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
586 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
587 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
588 this.unsetTaskStealing()
589 this.setTaskStealing()
591 this.unsetTaskStealing()
593 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
594 this.unsetTasksStealingOnBackPressure()
595 this.setTasksStealingOnBackPressure()
597 this.unsetTasksStealingOnBackPressure()
599 } else if (this.opts
.tasksQueueOptions
!= null) {
600 delete this.opts
.tasksQueueOptions
604 private buildTasksQueueOptions (
605 tasksQueueOptions
: TasksQueueOptions
| undefined
606 ): TasksQueueOptions
{
608 ...getDefaultTasksQueueOptions(
609 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
615 private setTasksQueueSize (size
: number): void {
616 for (const workerNode
of this.workerNodes
) {
617 workerNode
.tasksQueueBackPressureSize
= size
621 private setTaskStealing (): void {
622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
623 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 this.workerNodes
[workerNodeKey
].off(
631 this.handleWorkerNodeIdleEvent
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
638 this.workerNodes
[workerNodeKey
].on(
640 this.handleWorkerNodeBackPressureEvent
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
647 this.workerNodes
[workerNodeKey
].off(
649 this.handleWorkerNodeBackPressureEvent
655 * Whether the pool is full or not.
657 * The pool filling boolean status.
659 protected get
full (): boolean {
661 this.workerNodes
.length
>=
662 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
667 * Whether the pool is busy or not.
669 * The pool busyness boolean status.
671 protected abstract get
busy (): boolean
674 * Whether worker nodes are executing concurrently their tasks quota or not.
676 * @returns Worker nodes busyness boolean status.
678 protected internalBusy (): boolean {
679 if (this.opts
.enableTasksQueue
=== true) {
681 this.workerNodes
.findIndex(
683 workerNode
.info
.ready
&&
684 workerNode
.usage
.tasks
.executing
<
685 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
686 this.opts
.tasksQueueOptions
!.concurrency
!
691 this.workerNodes
.findIndex(
693 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
698 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
699 if (this.opts
.enableTasksQueue
=== true) {
701 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
703 this.opts
.tasksQueueOptions
!.concurrency
!
706 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey
: number,
711 message
: MessageValue
<Data
>
712 ): Promise
<boolean> {
713 return await new Promise
<boolean>((resolve
, reject
) => {
714 const taskFunctionOperationListener
= (
715 message
: MessageValue
<Response
>
717 this.checkMessageWorkerId(message
)
718 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
720 message
.taskFunctionOperationStatus
!= null &&
721 message
.workerId
=== workerId
723 if (message
.taskFunctionOperationStatus
) {
728 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
732 this.deregisterWorkerMessageListener(
733 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
734 taskFunctionOperationListener
738 this.registerWorkerMessageListener(
740 taskFunctionOperationListener
742 this.sendToWorker(workerNodeKey
, message
)
746 private async sendTaskFunctionOperationToWorkers (
747 message
: MessageValue
<Data
>
748 ): Promise
<boolean> {
749 return await new Promise
<boolean>((resolve
, reject
) => {
750 const responsesReceived
= new Array<MessageValue
<Response
>>()
751 const taskFunctionOperationsListener
= (
752 message
: MessageValue
<Response
>
754 this.checkMessageWorkerId(message
)
755 if (message
.taskFunctionOperationStatus
!= null) {
756 responsesReceived
.push(message
)
757 if (responsesReceived
.length
=== this.workerNodes
.length
) {
759 responsesReceived
.every(
760 message
=> message
.taskFunctionOperationStatus
=== true
765 responsesReceived
.some(
766 message
=> message
.taskFunctionOperationStatus
=== false
769 const errorResponse
= responsesReceived
.find(
770 response
=> response
.taskFunctionOperationStatus
=== false
774 `Task function operation '${
775 message.taskFunctionOperation as string
776 }' failed on worker ${errorResponse?.workerId} with error: '${
777 errorResponse?.workerError?.message
782 this.deregisterWorkerMessageListener(
783 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
784 taskFunctionOperationsListener
789 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
790 this.registerWorkerMessageListener(
792 taskFunctionOperationsListener
794 this.sendToWorker(workerNodeKey
, message
)
800 public hasTaskFunction (name
: string): boolean {
801 for (const workerNode
of this.workerNodes
) {
803 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
804 workerNode
.info
.taskFunctionNames
.includes(name
)
813 public async addTaskFunction (
815 fn
: TaskFunction
<Data
, Response
>
816 ): Promise
<boolean> {
817 if (typeof name
!== 'string') {
818 throw new TypeError('name argument must be a string')
820 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
821 throw new TypeError('name argument must not be an empty string')
823 if (typeof fn
!== 'function') {
824 throw new TypeError('fn argument must be a function')
826 const opResult
= await this.sendTaskFunctionOperationToWorkers({
827 taskFunctionOperation
: 'add',
828 taskFunctionName
: name
,
829 taskFunction
: fn
.toString()
831 this.taskFunctions
.set(name
, fn
)
836 public async removeTaskFunction (name
: string): Promise
<boolean> {
837 if (!this.taskFunctions
.has(name
)) {
839 'Cannot remove a task function not handled on the pool side'
842 const opResult
= await this.sendTaskFunctionOperationToWorkers({
843 taskFunctionOperation
: 'remove',
844 taskFunctionName
: name
846 this.deleteTaskFunctionWorkerUsages(name
)
847 this.taskFunctions
.delete(name
)
852 public listTaskFunctionNames (): string[] {
853 for (const workerNode
of this.workerNodes
) {
855 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
856 workerNode
.info
.taskFunctionNames
.length
> 0
858 return workerNode
.info
.taskFunctionNames
865 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
866 return await this.sendTaskFunctionOperationToWorkers({
867 taskFunctionOperation
: 'default',
868 taskFunctionName
: name
872 private deleteTaskFunctionWorkerUsages (name
: string): void {
873 for (const workerNode
of this.workerNodes
) {
874 workerNode
.deleteTaskFunctionWorkerUsage(name
)
878 private shallExecuteTask (workerNodeKey
: number): boolean {
880 this.tasksQueueSize(workerNodeKey
) === 0 &&
881 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
882 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
883 this.opts
.tasksQueueOptions
!.concurrency
!
888 public async execute (
891 transferList
?: TransferListItem
[]
892 ): Promise
<Response
> {
893 return await new Promise
<Response
>((resolve
, reject
) => {
895 reject(new Error('Cannot execute a task on not started pool'))
898 if (this.destroying
) {
899 reject(new Error('Cannot execute a task on destroying pool'))
902 if (name
!= null && typeof name
!== 'string') {
903 reject(new TypeError('name argument must be a string'))
908 typeof name
=== 'string' &&
909 name
.trim().length
=== 0
911 reject(new TypeError('name argument must not be an empty string'))
914 if (transferList
!= null && !Array.isArray(transferList
)) {
915 reject(new TypeError('transferList argument must be an array'))
918 const timestamp
= performance
.now()
919 const workerNodeKey
= this.chooseWorkerNode()
920 const task
: Task
<Data
> = {
921 name
: name
?? DEFAULT_TASK_NAME
,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data
: data
?? ({} as Data
),
928 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
929 this.promiseResponseMap
.set(task
.taskId
!, {
933 ...(this.emitter
!= null && {
934 asyncResource
: new AsyncResource('poolifier:task', {
935 triggerAsyncId
: this.emitter
.asyncId
,
936 requireManualDestroy
: true
941 this.opts
.enableTasksQueue
=== false ||
942 (this.opts
.enableTasksQueue
=== true &&
943 this.shallExecuteTask(workerNodeKey
))
945 this.executeTask(workerNodeKey
, task
)
947 this.enqueueTask(workerNodeKey
, task
)
953 public start (): void {
955 throw new Error('Cannot start an already started pool')
958 throw new Error('Cannot start an already starting pool')
960 if (this.destroying
) {
961 throw new Error('Cannot start a destroying pool')
965 this.workerNodes
.reduce(
966 (accumulator
, workerNode
) =>
967 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
969 ) < this.minimumNumberOfWorkers
971 this.createAndSetupWorkerNode()
973 this.starting
= false
978 public async destroy (): Promise
<void> {
980 throw new Error('Cannot destroy an already destroyed pool')
983 throw new Error('Cannot destroy an starting pool')
985 if (this.destroying
) {
986 throw new Error('Cannot destroy an already destroying pool')
988 this.destroying
= true
990 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
991 await this.destroyWorkerNode(workerNodeKey
)
994 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
995 this.emitter
?.emitDestroy()
996 this.readyEventEmitted
= false
997 this.destroying
= false
1001 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1002 await new Promise
<void>((resolve
, reject
) => {
1003 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1004 if (this.workerNodes
[workerNodeKey
] == null) {
1008 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1009 this.checkMessageWorkerId(message
)
1010 if (message
.kill
=== 'success') {
1012 } else if (message
.kill
=== 'failure') {
1015 `Kill message handling failed on worker ${message.workerId}`
1020 // FIXME: should be registered only once
1021 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1022 this.sendToWorker(workerNodeKey
, { kill
: true })
1027 * Terminates the worker node given its worker node key.
1029 * @param workerNodeKey - The worker node key.
1031 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1032 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1033 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1034 const workerNode
= this.workerNodes
[workerNodeKey
]
1035 await waitWorkerNodeEvents(
1039 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1040 getDefaultTasksQueueOptions(
1041 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1042 ).tasksFinishedTimeout
1044 await this.sendKillMessageToWorker(workerNodeKey
)
1045 await workerNode
.terminate()
1049 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1050 * Can be overridden.
1054 protected setupHook (): void {
1055 /* Intentionally empty */
1059 * Should return whether the worker is the main worker or not.
1061 protected abstract isMain (): boolean
1064 * Hook executed before the worker task execution.
1065 * Can be overridden.
1067 * @param workerNodeKey - The worker node key.
1068 * @param task - The task to execute.
1070 protected beforeTaskExecutionHook (
1071 workerNodeKey
: number,
1074 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1075 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1076 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1077 ++workerUsage
.tasks
.executing
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext
,
1085 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1086 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1087 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1090 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1091 const taskFunctionWorkerUsage
= this.workerNodes
[
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 ].getTaskFunctionWorkerUsage(task
.name
!)!
1095 ++taskFunctionWorkerUsage
.tasks
.executing
1096 updateWaitTimeWorkerUsage(
1097 this.workerChoiceStrategyContext
,
1098 taskFunctionWorkerUsage
,
1105 * Hook executed after the worker task execution.
1106 * Can be overridden.
1108 * @param workerNodeKey - The worker node key.
1109 * @param message - The received message.
1111 protected afterTaskExecutionHook (
1112 workerNodeKey
: number,
1113 message
: MessageValue
<Response
>
1115 let needWorkerChoiceStrategyUpdate
= false
1116 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1117 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1118 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1119 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1120 updateRunTimeWorkerUsage(
1121 this.workerChoiceStrategyContext
,
1125 updateEluWorkerUsage(
1126 this.workerChoiceStrategyContext
,
1130 needWorkerChoiceStrategyUpdate
= true
1133 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1134 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1135 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1136 message
.taskPerformance
!.name
1139 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1140 const taskFunctionWorkerUsage
= this.workerNodes
[
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1144 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1145 updateRunTimeWorkerUsage(
1146 this.workerChoiceStrategyContext
,
1147 taskFunctionWorkerUsage
,
1150 updateEluWorkerUsage(
1151 this.workerChoiceStrategyContext
,
1152 taskFunctionWorkerUsage
,
1155 needWorkerChoiceStrategyUpdate
= true
1157 if (needWorkerChoiceStrategyUpdate
) {
1158 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1163 * Whether the worker node shall update its task function worker usage or not.
1165 * @param workerNodeKey - The worker node key.
1166 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1168 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1169 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1171 workerInfo
!= null &&
1172 Array.isArray(workerInfo
.taskFunctionNames
) &&
1173 workerInfo
.taskFunctionNames
.length
> 2
1178 * Chooses a worker node for the next task.
1180 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1182 * @returns The chosen worker node key
1184 private chooseWorkerNode (): number {
1185 if (this.shallCreateDynamicWorker()) {
1186 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1188 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1189 .dynamicWorkerUsage
=== true
1191 return workerNodeKey
1194 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1195 return this.workerChoiceStrategyContext
!.execute()
1199 * Conditions for dynamic worker creation.
1201 * @returns Whether to create a dynamic worker or not.
1203 protected abstract shallCreateDynamicWorker (): boolean
1206 * Sends a message to worker given its worker node key.
1208 * @param workerNodeKey - The worker node key.
1209 * @param message - The message.
1210 * @param transferList - The optional array of transferable objects.
1212 protected abstract sendToWorker (
1213 workerNodeKey
: number,
1214 message
: MessageValue
<Data
>,
1215 transferList
?: TransferListItem
[]
1219 * Creates a new, completely set up worker node.
1221 * @returns New, completely set up worker node key.
1223 protected createAndSetupWorkerNode (): number {
1224 const workerNode
= this.createWorkerNode()
1225 workerNode
.registerWorkerEventHandler(
1227 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1229 workerNode
.registerWorkerEventHandler(
1231 this.opts
.messageHandler
?? EMPTY_FUNCTION
1233 workerNode
.registerWorkerEventHandler(
1235 this.opts
.errorHandler
?? EMPTY_FUNCTION
1237 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1238 workerNode
.info
.ready
= false
1239 this.emitter
?.emit(PoolEvents
.error
, error
)
1243 this.opts
.restartWorkerOnError
=== true
1245 if (workerNode
.info
.dynamic
) {
1246 this.createAndSetupDynamicWorkerNode()
1248 this.createAndSetupWorkerNode()
1254 this.opts
.enableTasksQueue
=== true
1256 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1258 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1259 workerNode
?.terminate().catch((error
: unknown
) => {
1260 this.emitter
?.emit(PoolEvents
.error
, error
)
1263 workerNode
.registerWorkerEventHandler(
1265 this.opts
.exitHandler
?? EMPTY_FUNCTION
1267 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1268 this.removeWorkerNode(workerNode
)
1270 const workerNodeKey
= this.addWorkerNode(workerNode
)
1271 this.afterWorkerNodeSetup(workerNodeKey
)
1272 return workerNodeKey
1276 * Creates a new, completely set up dynamic worker node.
1278 * @returns New, completely set up dynamic worker node key.
1280 protected createAndSetupDynamicWorkerNode (): number {
1281 const workerNodeKey
= this.createAndSetupWorkerNode()
1282 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1283 this.checkMessageWorkerId(message
)
1284 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1287 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1288 // Kill message received from worker
1290 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1291 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1292 ((this.opts
.enableTasksQueue
=== false &&
1293 workerUsage
.tasks
.executing
=== 0) ||
1294 (this.opts
.enableTasksQueue
=== true &&
1295 workerUsage
.tasks
.executing
=== 0 &&
1296 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1298 // Flag the worker node as not ready immediately
1299 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1300 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1301 this.emitter
?.emit(PoolEvents
.error
, error
)
1305 this.sendToWorker(workerNodeKey
, {
1308 if (this.taskFunctions
.size
> 0) {
1309 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1310 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1311 taskFunctionOperation
: 'add',
1313 taskFunction
: taskFunction
.toString()
1314 }).catch((error
: unknown
) => {
1315 this.emitter
?.emit(PoolEvents
.error
, error
)
1319 const workerNode
= this.workerNodes
[workerNodeKey
]
1320 workerNode
.info
.dynamic
= true
1322 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1323 .dynamicWorkerReady
=== true ||
1324 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1325 .dynamicWorkerUsage
=== true
1327 workerNode
.info
.ready
= true
1329 this.checkAndEmitDynamicWorkerCreationEvents()
1330 return workerNodeKey
1334 * Registers a listener callback on the worker given its worker node key.
1336 * @param workerNodeKey - The worker node key.
1337 * @param listener - The message listener callback.
1339 protected abstract registerWorkerMessageListener
<
1340 Message
extends Data
| Response
1342 workerNodeKey
: number,
1343 listener
: (message
: MessageValue
<Message
>) => void
1347 * Registers once a listener callback on the worker given its worker node key.
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1352 protected abstract registerOnceWorkerMessageListener
<
1353 Message
extends Data
| Response
1355 workerNodeKey
: number,
1356 listener
: (message
: MessageValue
<Message
>) => void
1360 * Deregisters a listener callback on the worker given its worker node key.
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1365 protected abstract deregisterWorkerMessageListener
<
1366 Message
extends Data
| Response
1368 workerNodeKey
: number,
1369 listener
: (message
: MessageValue
<Message
>) => void
1373 * Method hooked up after a worker node has been newly created.
1374 * Can be overridden.
1376 * @param workerNodeKey - The newly created worker node key.
1378 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1379 // Listen to worker messages.
1380 this.registerWorkerMessageListener(
1382 this.workerMessageListener
1384 // Send the startup message to worker.
1385 this.sendStartupMessageToWorker(workerNodeKey
)
1386 // Send the statistics message to worker.
1387 this.sendStatisticsMessageToWorker(workerNodeKey
)
1388 if (this.opts
.enableTasksQueue
=== true) {
1389 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1390 this.workerNodes
[workerNodeKey
].on(
1392 this.handleWorkerNodeIdleEvent
1395 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1396 this.workerNodes
[workerNodeKey
].on(
1398 this.handleWorkerNodeBackPressureEvent
1405 * Sends the startup message to worker given its worker node key.
1407 * @param workerNodeKey - The worker node key.
1409 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1412 * Sends the statistics message to worker given its worker node key.
1414 * @param workerNodeKey - The worker node key.
1416 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1417 this.sendToWorker(workerNodeKey
, {
1420 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1421 .runTime
.aggregate
?? false,
1423 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
1429 private cannotStealTask (): boolean {
1430 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1433 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1434 if (this.shallExecuteTask(workerNodeKey
)) {
1435 this.executeTask(workerNodeKey
, task
)
1437 this.enqueueTask(workerNodeKey
, task
)
1441 private redistributeQueuedTasks (workerNodeKey
: number): void {
1442 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1445 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1446 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1447 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1448 return workerNode
.info
.ready
&&
1449 workerNode
.usage
.tasks
.queued
<
1450 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1457 destinationWorkerNodeKey
,
1458 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1459 this.dequeueTask(workerNodeKey
)!
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey
: number,
1468 const workerNode
= this.workerNodes
[workerNodeKey
]
1469 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1470 if (workerNode
?.usage
!= null) {
1471 ++workerNode
.usage
.tasks
.stolen
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1475 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1477 const taskFunctionWorkerUsage
=
1478 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1479 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1480 ++taskFunctionWorkerUsage
.tasks
.stolen
1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1485 workerNodeKey
: number
1487 const workerNode
= this.workerNodes
[workerNodeKey
]
1488 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1489 if (workerNode
?.usage
!= null) {
1490 ++workerNode
.usage
.tasks
.sequentiallyStolen
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey
: number,
1498 const workerNode
= this.workerNodes
[workerNodeKey
]
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1501 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1503 const taskFunctionWorkerUsage
=
1504 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1505 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1506 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1511 workerNodeKey
: number
1513 const workerNode
= this.workerNodes
[workerNodeKey
]
1514 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1515 if (workerNode
?.usage
!= null) {
1516 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1520 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1521 workerNodeKey
: number,
1524 const workerNode
= this.workerNodes
[workerNodeKey
]
1526 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1527 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1529 const taskFunctionWorkerUsage
=
1530 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1531 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1532 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1536 private readonly handleWorkerNodeIdleEvent
= (
1537 eventDetail
: WorkerNodeEventDetail
,
1538 previousStolenTask
?: Task
<Data
>
1540 const { workerNodeKey
} = eventDetail
1541 if (workerNodeKey
== null) {
1543 "WorkerNode event detail 'workerNodeKey' property must be defined"
1546 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1548 this.cannotStealTask() ||
1549 (this.info
.stealingWorkerNodes
?? 0) >
1550 Math.floor(this.workerNodes
.length
/ 2)
1552 if (workerInfo
!= null && previousStolenTask
!= null) {
1553 workerInfo
.stealing
= false
1557 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1559 workerInfo
!= null &&
1560 previousStolenTask
!= null &&
1561 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1562 (workerNodeTasksUsage
.executing
> 0 ||
1563 this.tasksQueueSize(workerNodeKey
) > 0)
1565 workerInfo
.stealing
= false
1566 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1567 for (const taskName
of workerInfo
.taskFunctionNames
!) {
1568 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1573 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1576 if (workerInfo
== null) {
1578 `Worker node with key '${workerNodeKey}' not found in pool`
1581 workerInfo
.stealing
= true
1582 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1584 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1587 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1588 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1591 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1593 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1594 (previousStolenTask
!= null &&
1595 previousStolenTask
.name
=== stolenTask
.name
&&
1596 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1598 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1604 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1606 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1613 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1616 .catch((error
: unknown
) => {
1617 this.emitter
?.emit(PoolEvents
.error
, error
)
1621 private readonly workerNodeStealTask
= (
1622 workerNodeKey
: number
1623 ): Task
<Data
> | undefined => {
1624 const workerNodes
= this.workerNodes
1627 (workerNodeA
, workerNodeB
) =>
1628 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1630 const sourceWorkerNode
= workerNodes
.find(
1631 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1632 sourceWorkerNode
.info
.ready
&&
1633 !sourceWorkerNode
.info
.stealing
&&
1634 sourceWorkerNodeKey
!== workerNodeKey
&&
1635 sourceWorkerNode
.usage
.tasks
.queued
> 0
1637 if (sourceWorkerNode
!= null) {
1638 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1639 const task
= sourceWorkerNode
.popTask()!
1640 this.handleTask(workerNodeKey
, task
)
1641 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1642 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1643 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1648 private readonly handleWorkerNodeBackPressureEvent
= (
1649 eventDetail
: WorkerNodeEventDetail
1652 this.cannotStealTask() ||
1653 (this.info
.stealingWorkerNodes
?? 0) >
1654 Math.floor(this.workerNodes
.length
/ 2)
1658 const { workerId
} = eventDetail
1659 const sizeOffset
= 1
1660 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1661 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1664 const sourceWorkerNode
=
1665 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1666 const workerNodes
= this.workerNodes
1669 (workerNodeA
, workerNodeB
) =>
1670 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1672 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1674 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1675 workerNode
.info
.ready
&&
1676 !workerNode
.info
.stealing
&&
1677 workerNode
.info
.id
!== workerId
&&
1678 workerNode
.usage
.tasks
.queued
<
1679 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1680 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1682 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1683 if (workerInfo
== null) {
1685 `Worker node with key '${workerNodeKey}' not found in pool`
1688 workerInfo
.stealing
= true
1689 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1690 const task
= sourceWorkerNode
.popTask()!
1691 this.handleTask(workerNodeKey
, task
)
1692 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1693 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1694 workerInfo
.stealing
= false
1700 * This method is the message listener registered on each worker.
1702 protected readonly workerMessageListener
= (
1703 message
: MessageValue
<Response
>
1705 this.checkMessageWorkerId(message
)
1706 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1707 if (ready
!= null && taskFunctionNames
!= null) {
1708 // Worker ready response received from worker
1709 this.handleWorkerReadyResponse(message
)
1710 } else if (taskId
!= null) {
1711 // Task execution response received from worker
1712 this.handleTaskExecutionResponse(message
)
1713 } else if (taskFunctionNames
!= null) {
1714 // Task function names message received from worker
1715 const workerInfo
= this.getWorkerInfo(
1716 this.getWorkerNodeKeyByWorkerId(workerId
)
1718 if (workerInfo
!= null) {
1719 workerInfo
.taskFunctionNames
= taskFunctionNames
1724 private checkAndEmitReadyEvent (): void {
1725 if (!this.readyEventEmitted
&& this.ready
) {
1726 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1727 this.readyEventEmitted
= true
1731 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1732 const { workerId
, ready
, taskFunctionNames
} = message
1733 if (ready
== null || !ready
) {
1734 throw new Error(`Worker ${workerId} failed to initialize`)
1737 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1738 workerNode
.info
.ready
= ready
1739 workerNode
.info
.taskFunctionNames
= taskFunctionNames
1740 this.checkAndEmitReadyEvent()
1743 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1744 const { workerId
, taskId
, workerError
, data
} = message
1745 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1746 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1747 if (promiseResponse
!= null) {
1748 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1749 const workerNode
= this.workerNodes
[workerNodeKey
]
1750 if (workerError
!= null) {
1751 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1752 asyncResource
!= null
1753 ? asyncResource
.runInAsyncScope(
1758 : reject(workerError
.message
)
1760 asyncResource
!= null
1761 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1762 : resolve(data
as Response
)
1764 asyncResource
?.emitDestroy()
1765 this.afterTaskExecutionHook(workerNodeKey
, message
)
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 this.promiseResponseMap
.delete(taskId
!)
1768 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1769 workerNode
?.emit('taskFinished', taskId
)
1771 this.opts
.enableTasksQueue
=== true &&
1773 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1776 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1778 this.tasksQueueSize(workerNodeKey
) > 0 &&
1779 workerNodeTasksUsage
.executing
<
1780 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1781 this.opts
.tasksQueueOptions
!.concurrency
!
1783 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1784 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1787 workerNodeTasksUsage
.executing
=== 0 &&
1788 this.tasksQueueSize(workerNodeKey
) === 0 &&
1789 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1791 workerNode
.emit('idle', {
1800 private checkAndEmitTaskExecutionEvents (): void {
1802 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1806 private checkAndEmitTaskQueuingEvents (): void {
1807 if (this.hasBackPressure()) {
1808 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1813 * Emits dynamic worker creation events.
1815 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1818 * Gets the worker information given its worker node key.
1820 * @param workerNodeKey - The worker node key.
1821 * @returns The worker information.
1823 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1824 return this.workerNodes
[workerNodeKey
]?.info
1828 * Creates a worker node.
1830 * @returns The created worker node.
1832 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1833 const workerNode
= new WorkerNode
<Worker
, Data
>(
1838 workerOptions
: this.opts
.workerOptions
,
1839 tasksQueueBackPressureSize
:
1840 this.opts
.tasksQueueOptions
?.size
??
1841 getDefaultTasksQueueOptions(
1842 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1846 // Flag the worker node as ready at pool startup.
1847 if (this.starting
) {
1848 workerNode
.info
.ready
= true
1854 * Adds the given worker node in the pool worker nodes.
1856 * @param workerNode - The worker node.
1857 * @returns The added worker node key.
1858 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1860 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1861 this.workerNodes
.push(workerNode
)
1862 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1863 if (workerNodeKey
=== -1) {
1864 throw new Error('Worker added not found in worker nodes')
1866 return workerNodeKey
1869 private checkAndEmitEmptyEvent (): void {
1871 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1872 this.readyEventEmitted
= false
1877 * Removes the worker node from the pool worker nodes.
1879 * @param workerNode - The worker node.
1881 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1882 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1883 if (workerNodeKey
!== -1) {
1884 this.workerNodes
.splice(workerNodeKey
, 1)
1885 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1887 this.checkAndEmitEmptyEvent()
1890 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1891 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1892 if (workerInfo
!= null) {
1893 workerInfo
.ready
= false
1897 private hasBackPressure (): boolean {
1899 this.opts
.enableTasksQueue
=== true &&
1900 this.workerNodes
.findIndex(
1901 workerNode
=> !workerNode
.hasBackPressure()
1907 * Executes the given task on the worker given its worker node key.
1909 * @param workerNodeKey - The worker node key.
1910 * @param task - The task to execute.
1912 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1913 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1914 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1915 this.checkAndEmitTaskExecutionEvents()
1918 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1919 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1920 this.checkAndEmitTaskQueuingEvents()
1921 return tasksQueueSize
1924 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1925 return this.workerNodes
[workerNodeKey
].dequeueTask()
1928 private tasksQueueSize (workerNodeKey
: number): number {
1929 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1932 protected flushTasksQueue (workerNodeKey
: number): number {
1933 let flushedTasks
= 0
1934 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1935 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1936 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1939 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1943 private flushTasksQueues (): void {
1944 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1945 this.flushTasksQueue(workerNodeKey
)