1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 } from
'../utility-types.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
26 import { KillBehaviors
} from
'../worker/worker-options.js'
34 type TasksQueueOptions
38 WorkerChoiceStrategies
,
39 type WorkerChoiceStrategy
,
40 type WorkerChoiceStrategyOptions
41 } from
'./selection-strategies/selection-strategies-types.js'
42 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
45 checkValidTasksQueueOptions
,
46 checkValidWorkerChoiceStrategy
,
47 getDefaultTasksQueueOptions
,
49 updateRunTimeWorkerUsage
,
50 updateTaskStatisticsWorkerUsage
,
51 updateWaitTimeWorkerUsage
,
54 import { version
} from
'./version.js'
59 WorkerNodeEventDetail
,
62 import { WorkerNode
} from
'./worker-node.js'
65 * Base class that implements some shared logic for all poolifier pools.
67 * @typeParam Worker - Type of worker which manages this pool.
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
71 export abstract class AbstractPool
<
72 Worker
extends IWorker
,
75 > implements IPool
<Worker
, Data
, Response
> {
77 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
80 public emitter
?: EventEmitterAsyncResource
83 * The task execution response promise map:
84 * - `key`: The message id of each submitted task.
85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
89 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
90 new Map
<string, PromiseResponseWrapper
<Response
>>()
93 * Worker choice strategy context referencing a worker choice algorithm implementation.
95 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
106 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
109 * Whether the pool is started or not.
111 private started
: boolean
113 * Whether the pool is starting or not.
115 private starting
: boolean
117 * Whether the pool is destroying or not.
119 private destroying
: boolean
121 * Whether the pool ready event has been emitted or not.
123 private readyEventEmitted
: boolean
125 * The start timestamp of the pool.
127 private readonly startTimestamp
130 * Constructs a new poolifier pool.
132 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
133 * @param filePath - Path to the worker file.
134 * @param opts - Options for the pool.
135 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
138 protected readonly minimumNumberOfWorkers
: number,
139 protected readonly filePath
: string,
140 protected readonly opts
: PoolOptions
<Worker
>,
141 protected readonly maximumNumberOfWorkers
?: number
143 if (!this.isMain()) {
145 'Cannot start a pool from a worker with the same type as the pool'
149 checkFilePath(this.filePath
)
150 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
151 this.checkPoolOptions(this.opts
)
153 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
154 this.executeTask
= this.executeTask
.bind(this)
155 this.enqueueTask
= this.enqueueTask
.bind(this)
157 if (this.opts
.enableEvents
=== true) {
158 this.initializeEventEmitter()
160 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
166 this.opts
.workerChoiceStrategy
,
167 this.opts
.workerChoiceStrategyOptions
172 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
175 this.starting
= false
176 this.destroying
= false
177 this.readyEventEmitted
= false
178 if (this.opts
.startWorkers
=== true) {
182 this.startTimestamp
= performance
.now()
185 private checkPoolType (): void {
186 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
188 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
193 private checkMinimumNumberOfWorkers (
194 minimumNumberOfWorkers
: number | undefined
196 if (minimumNumberOfWorkers
== null) {
198 'Cannot instantiate a pool without specifying the number of workers'
200 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
202 'Cannot instantiate a pool with a non safe integer number of workers'
204 } else if (minimumNumberOfWorkers
< 0) {
205 throw new RangeError(
206 'Cannot instantiate a pool with a negative number of workers'
208 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
209 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
213 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
214 if (isPlainObject(opts
)) {
215 this.opts
.startWorkers
= opts
.startWorkers
?? true
216 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
217 this.opts
.workerChoiceStrategy
=
218 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
219 this.checkValidWorkerChoiceStrategyOptions(
220 opts
.workerChoiceStrategyOptions
222 if (opts
.workerChoiceStrategyOptions
!= null) {
223 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
225 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
226 this.opts
.enableEvents
= opts
.enableEvents
?? true
227 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
228 if (this.opts
.enableTasksQueue
) {
229 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
230 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
231 opts
.tasksQueueOptions
235 throw new TypeError('Invalid pool options: must be a plain object')
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
243 workerChoiceStrategyOptions
!= null &&
244 !isPlainObject(workerChoiceStrategyOptions
)
247 'Invalid worker choice strategy options: must be a plain object'
251 workerChoiceStrategyOptions
?.weights
!= null &&
252 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
253 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
256 'Invalid worker choice strategy options: must have a weight for each worker node'
260 workerChoiceStrategyOptions
?.measurement
!= null &&
261 !Object.values(Measurements
).includes(
262 workerChoiceStrategyOptions
.measurement
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
271 private initializeEventEmitter (): void {
272 this.emitter
= new EventEmitterAsyncResource({
273 name
: `poolifier:${this.type}-${this.worker}-pool`
278 public get
info (): PoolInfo
{
283 started
: this.started
,
285 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
286 strategy
: this.opts
.workerChoiceStrategy
!,
287 strategyRetries
: this.workerChoiceStrategyContext
?.retriesCount
?? 0,
288 minSize
: this.minimumNumberOfWorkers
,
289 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
290 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
291 .runTime
.aggregate
=== true &&
292 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
293 .waitTime
.aggregate
&& {
294 utilization
: round(this.utilization
)
296 workerNodes
: this.workerNodes
.length
,
297 idleWorkerNodes
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 workerNode
.usage
.tasks
.executing
=== 0
304 ...(this.opts
.enableTasksQueue
=== true && {
305 stealingWorkerNodes
: this.workerNodes
.reduce(
306 (accumulator
, workerNode
) =>
307 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
311 busyWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, _workerNode
, workerNodeKey
) =>
313 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
316 executedTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ workerNode
.usage
.tasks
.executed
,
321 executingTasks
: this.workerNodes
.reduce(
322 (accumulator
, workerNode
) =>
323 accumulator
+ workerNode
.usage
.tasks
.executing
,
326 ...(this.opts
.enableTasksQueue
=== true && {
327 queuedTasks
: this.workerNodes
.reduce(
328 (accumulator
, workerNode
) =>
329 accumulator
+ workerNode
.usage
.tasks
.queued
,
333 ...(this.opts
.enableTasksQueue
=== true && {
334 maxQueuedTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
340 ...(this.opts
.enableTasksQueue
=== true && {
341 backPressure
: this.hasBackPressure()
343 ...(this.opts
.enableTasksQueue
=== true && {
344 stolenTasks
: this.workerNodes
.reduce(
345 (accumulator
, workerNode
) =>
346 accumulator
+ workerNode
.usage
.tasks
.stolen
,
350 failedTasks
: this.workerNodes
.reduce(
351 (accumulator
, workerNode
) =>
352 accumulator
+ workerNode
.usage
.tasks
.failed
,
355 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
356 .runTime
.aggregate
=== true && {
360 ...this.workerNodes
.map(
361 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
367 ...this.workerNodes
.map(
368 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
372 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
373 .runTime
.average
&& {
376 this.workerNodes
.reduce
<number[]>(
377 (accumulator
, workerNode
) =>
378 accumulator
.concat(workerNode
.usage
.runTime
.history
),
384 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
388 this.workerNodes
.reduce
<number[]>(
389 (accumulator
, workerNode
) =>
390 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
399 .waitTime
.aggregate
=== true && {
403 ...this.workerNodes
.map(
404 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
410 ...this.workerNodes
.map(
411 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
415 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
416 .waitTime
.average
&& {
419 this.workerNodes
.reduce
<number[]>(
420 (accumulator
, workerNode
) =>
421 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
427 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
428 .waitTime
.median
&& {
431 this.workerNodes
.reduce
<number[]>(
432 (accumulator
, workerNode
) =>
433 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
445 * The pool readiness boolean status.
447 private get
ready (): boolean {
452 this.workerNodes
.reduce(
453 (accumulator
, workerNode
) =>
454 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
458 ) >= this.minimumNumberOfWorkers
463 * The pool emptiness boolean status.
465 protected get
empty (): boolean {
466 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
470 * The approximate pool utilization.
472 * @returns The pool utilization.
474 private get
utilization (): number {
475 const poolTimeCapacity
=
476 (performance
.now() - this.startTimestamp
) *
477 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
478 const totalTasksRunTime
= this.workerNodes
.reduce(
479 (accumulator
, workerNode
) =>
480 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
483 const totalTasksWaitTime
= this.workerNodes
.reduce(
484 (accumulator
, workerNode
) =>
485 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
488 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
494 * If it is `'dynamic'`, it provides the `max` property.
496 protected abstract get
type (): PoolType
501 protected abstract get
worker (): WorkerType
504 * Checks if the worker id sent in the received message from a worker is valid.
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
509 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
510 if (message
.workerId
== null) {
511 throw new Error('Worker message received without worker id')
512 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
514 `Worker message received from unknown worker '${message.workerId}'`
520 * Gets the worker node key given its worker id.
522 * @param workerId - The worker id.
523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
525 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
526 return this.workerNodes
.findIndex(
527 workerNode
=> workerNode
.info
.id
=== workerId
532 public setWorkerChoiceStrategy (
533 workerChoiceStrategy
: WorkerChoiceStrategy
,
534 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
536 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
537 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
538 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
539 this.opts
.workerChoiceStrategy
541 if (workerChoiceStrategyOptions
!= null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
544 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
545 workerNode
.resetUsage()
546 this.sendStatisticsMessageToWorker(workerNodeKey
)
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 if (workerChoiceStrategyOptions
!= null) {
556 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
558 this.workerChoiceStrategyContext
?.setOptions(
559 this.opts
.workerChoiceStrategyOptions
564 public enableTasksQueue (
566 tasksQueueOptions
?: TasksQueueOptions
568 if (this.opts
.enableTasksQueue
=== true && !enable
) {
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
571 this.flushTasksQueues()
573 this.opts
.enableTasksQueue
= enable
574 this.setTasksQueueOptions(tasksQueueOptions
)
578 public setTasksQueueOptions (
579 tasksQueueOptions
: TasksQueueOptions
| undefined
581 if (this.opts
.enableTasksQueue
=== true) {
582 checkValidTasksQueueOptions(tasksQueueOptions
)
583 this.opts
.tasksQueueOptions
=
584 this.buildTasksQueueOptions(tasksQueueOptions
)
585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
586 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
587 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
588 this.unsetTaskStealing()
589 this.setTaskStealing()
591 this.unsetTaskStealing()
593 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
594 this.unsetTasksStealingOnBackPressure()
595 this.setTasksStealingOnBackPressure()
597 this.unsetTasksStealingOnBackPressure()
599 } else if (this.opts
.tasksQueueOptions
!= null) {
600 delete this.opts
.tasksQueueOptions
604 private buildTasksQueueOptions (
605 tasksQueueOptions
: TasksQueueOptions
| undefined
606 ): TasksQueueOptions
{
608 ...getDefaultTasksQueueOptions(
609 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
615 private setTasksQueueSize (size
: number): void {
616 for (const workerNode
of this.workerNodes
) {
617 workerNode
.tasksQueueBackPressureSize
= size
621 private setTaskStealing (): void {
622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
623 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 this.workerNodes
[workerNodeKey
].off(
631 this.handleWorkerNodeIdleEvent
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
638 this.workerNodes
[workerNodeKey
].on(
640 this.handleWorkerNodeBackPressureEvent
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
647 this.workerNodes
[workerNodeKey
].off(
649 this.handleWorkerNodeBackPressureEvent
655 * Whether the pool is full or not.
657 * The pool filling boolean status.
659 protected get
full (): boolean {
661 this.workerNodes
.length
>=
662 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
667 * Whether the pool is busy or not.
669 * The pool busyness boolean status.
671 protected abstract get
busy (): boolean
674 * Whether worker nodes are executing concurrently their tasks quota or not.
676 * @returns Worker nodes busyness boolean status.
678 protected internalBusy (): boolean {
679 if (this.opts
.enableTasksQueue
=== true) {
681 this.workerNodes
.findIndex(
683 workerNode
.info
.ready
&&
684 workerNode
.usage
.tasks
.executing
<
685 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
686 this.opts
.tasksQueueOptions
!.concurrency
!
691 this.workerNodes
.findIndex(
693 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
698 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
699 if (this.opts
.enableTasksQueue
=== true) {
701 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
703 this.opts
.tasksQueueOptions
!.concurrency
!
706 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey
: number,
711 message
: MessageValue
<Data
>
712 ): Promise
<boolean> {
713 return await new Promise
<boolean>((resolve
, reject
) => {
714 const taskFunctionOperationListener
= (
715 message
: MessageValue
<Response
>
717 this.checkMessageWorkerId(message
)
718 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
720 message
.taskFunctionOperationStatus
!= null &&
721 message
.workerId
=== workerId
723 if (message
.taskFunctionOperationStatus
) {
728 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
732 this.deregisterWorkerMessageListener(
733 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
734 taskFunctionOperationListener
738 this.registerWorkerMessageListener(
740 taskFunctionOperationListener
742 this.sendToWorker(workerNodeKey
, message
)
746 private async sendTaskFunctionOperationToWorkers (
747 message
: MessageValue
<Data
>
748 ): Promise
<boolean> {
749 return await new Promise
<boolean>((resolve
, reject
) => {
750 const responsesReceived
= new Array<MessageValue
<Response
>>()
751 const taskFunctionOperationsListener
= (
752 message
: MessageValue
<Response
>
754 this.checkMessageWorkerId(message
)
755 if (message
.taskFunctionOperationStatus
!= null) {
756 responsesReceived
.push(message
)
757 if (responsesReceived
.length
=== this.workerNodes
.length
) {
759 responsesReceived
.every(
760 message
=> message
.taskFunctionOperationStatus
=== true
765 responsesReceived
.some(
766 message
=> message
.taskFunctionOperationStatus
=== false
769 const errorResponse
= responsesReceived
.find(
770 response
=> response
.taskFunctionOperationStatus
=== false
774 `Task function operation '${
775 message.taskFunctionOperation as string
776 }' failed on worker ${errorResponse?.workerId} with error: '${
777 errorResponse?.workerError?.message
782 this.deregisterWorkerMessageListener(
783 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
784 taskFunctionOperationsListener
789 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
790 this.registerWorkerMessageListener(
792 taskFunctionOperationsListener
794 this.sendToWorker(workerNodeKey
, message
)
800 public hasTaskFunction (name
: string): boolean {
801 for (const workerNode
of this.workerNodes
) {
803 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
804 workerNode
.info
.taskFunctionNames
.includes(name
)
813 public async addTaskFunction (
815 fn
: TaskFunction
<Data
, Response
>
816 ): Promise
<boolean> {
817 if (typeof name
!== 'string') {
818 throw new TypeError('name argument must be a string')
820 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
821 throw new TypeError('name argument must not be an empty string')
823 if (typeof fn
!== 'function') {
824 throw new TypeError('fn argument must be a function')
826 const opResult
= await this.sendTaskFunctionOperationToWorkers({
827 taskFunctionOperation
: 'add',
828 taskFunctionName
: name
,
829 taskFunction
: fn
.toString()
831 this.taskFunctions
.set(name
, fn
)
836 public async removeTaskFunction (name
: string): Promise
<boolean> {
837 if (!this.taskFunctions
.has(name
)) {
839 'Cannot remove a task function not handled on the pool side'
842 const opResult
= await this.sendTaskFunctionOperationToWorkers({
843 taskFunctionOperation
: 'remove',
844 taskFunctionName
: name
846 this.deleteTaskFunctionWorkerUsages(name
)
847 this.taskFunctions
.delete(name
)
852 public listTaskFunctionNames (): string[] {
853 for (const workerNode
of this.workerNodes
) {
855 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
856 workerNode
.info
.taskFunctionNames
.length
> 0
858 return workerNode
.info
.taskFunctionNames
865 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
866 return await this.sendTaskFunctionOperationToWorkers({
867 taskFunctionOperation
: 'default',
868 taskFunctionName
: name
872 private deleteTaskFunctionWorkerUsages (name
: string): void {
873 for (const workerNode
of this.workerNodes
) {
874 workerNode
.deleteTaskFunctionWorkerUsage(name
)
878 private shallExecuteTask (workerNodeKey
: number): boolean {
880 this.tasksQueueSize(workerNodeKey
) === 0 &&
881 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
882 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
883 this.opts
.tasksQueueOptions
!.concurrency
!
888 public async execute (
891 transferList
?: TransferListItem
[]
892 ): Promise
<Response
> {
893 return await new Promise
<Response
>((resolve
, reject
) => {
895 reject(new Error('Cannot execute a task on not started pool'))
898 if (this.destroying
) {
899 reject(new Error('Cannot execute a task on destroying pool'))
902 if (name
!= null && typeof name
!== 'string') {
903 reject(new TypeError('name argument must be a string'))
908 typeof name
=== 'string' &&
909 name
.trim().length
=== 0
911 reject(new TypeError('name argument must not be an empty string'))
914 if (transferList
!= null && !Array.isArray(transferList
)) {
915 reject(new TypeError('transferList argument must be an array'))
918 const timestamp
= performance
.now()
919 const workerNodeKey
= this.chooseWorkerNode()
920 const task
: Task
<Data
> = {
921 name
: name
?? DEFAULT_TASK_NAME
,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data
: data
?? ({} as Data
),
928 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
929 this.promiseResponseMap
.set(task
.taskId
!, {
933 ...(this.emitter
!= null && {
934 asyncResource
: new AsyncResource('poolifier:task', {
935 triggerAsyncId
: this.emitter
.asyncId
,
936 requireManualDestroy
: true
941 this.opts
.enableTasksQueue
=== false ||
942 (this.opts
.enableTasksQueue
=== true &&
943 this.shallExecuteTask(workerNodeKey
))
945 this.executeTask(workerNodeKey
, task
)
947 this.enqueueTask(workerNodeKey
, task
)
953 * Starts the minimum number of workers.
955 private startMinimumNumberOfWorkers (): void {
957 this.workerNodes
.reduce(
958 (accumulator
, workerNode
) =>
959 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
961 ) < this.minimumNumberOfWorkers
963 this.createAndSetupWorkerNode()
968 public start (): void {
970 throw new Error('Cannot start an already started pool')
973 throw new Error('Cannot start an already starting pool')
975 if (this.destroying
) {
976 throw new Error('Cannot start a destroying pool')
979 this.startMinimumNumberOfWorkers()
980 this.starting
= false
985 public async destroy (): Promise
<void> {
987 throw new Error('Cannot destroy an already destroyed pool')
990 throw new Error('Cannot destroy an starting pool')
992 if (this.destroying
) {
993 throw new Error('Cannot destroy an already destroying pool')
995 this.destroying
= true
997 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
998 await this.destroyWorkerNode(workerNodeKey
)
1001 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1002 this.emitter
?.emitDestroy()
1003 this.readyEventEmitted
= false
1004 this.destroying
= false
1005 this.started
= false
1008 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1009 await new Promise
<void>((resolve
, reject
) => {
1010 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1011 if (this.workerNodes
[workerNodeKey
] == null) {
1015 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1016 this.checkMessageWorkerId(message
)
1017 if (message
.kill
=== 'success') {
1019 } else if (message
.kill
=== 'failure') {
1022 `Kill message handling failed on worker ${message.workerId}`
1027 // FIXME: should be registered only once
1028 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1029 this.sendToWorker(workerNodeKey
, { kill
: true })
1034 * Terminates the worker node given its worker node key.
1036 * @param workerNodeKey - The worker node key.
1038 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1039 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1040 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1041 const workerNode
= this.workerNodes
[workerNodeKey
]
1042 await waitWorkerNodeEvents(
1046 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1047 getDefaultTasksQueueOptions(
1048 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1049 ).tasksFinishedTimeout
1051 await this.sendKillMessageToWorker(workerNodeKey
)
1052 await workerNode
.terminate()
1056 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1057 * Can be overridden.
1061 protected setupHook (): void {
1062 /* Intentionally empty */
1066 * Should return whether the worker is the main worker or not.
1068 protected abstract isMain (): boolean
1071 * Hook executed before the worker task execution.
1072 * Can be overridden.
1074 * @param workerNodeKey - The worker node key.
1075 * @param task - The task to execute.
1077 protected beforeTaskExecutionHook (
1078 workerNodeKey
: number,
1081 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1082 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1083 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1084 ++workerUsage
.tasks
.executing
1085 updateWaitTimeWorkerUsage(
1086 this.workerChoiceStrategyContext
,
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1097 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1098 const taskFunctionWorkerUsage
= this.workerNodes
[
1100 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1101 ].getTaskFunctionWorkerUsage(task
.name
!)!
1102 ++taskFunctionWorkerUsage
.tasks
.executing
1103 updateWaitTimeWorkerUsage(
1104 this.workerChoiceStrategyContext
,
1105 taskFunctionWorkerUsage
,
1112 * Hook executed after the worker task execution.
1113 * Can be overridden.
1115 * @param workerNodeKey - The worker node key.
1116 * @param message - The received message.
1118 protected afterTaskExecutionHook (
1119 workerNodeKey
: number,
1120 message
: MessageValue
<Response
>
1122 let needWorkerChoiceStrategyUpdate
= false
1123 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1124 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1125 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1126 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1127 updateRunTimeWorkerUsage(
1128 this.workerChoiceStrategyContext
,
1132 updateEluWorkerUsage(
1133 this.workerChoiceStrategyContext
,
1137 needWorkerChoiceStrategyUpdate
= true
1140 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1141 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 message
.taskPerformance
!.name
1146 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1147 const taskFunctionWorkerUsage
= this.workerNodes
[
1149 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1150 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1151 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1152 updateRunTimeWorkerUsage(
1153 this.workerChoiceStrategyContext
,
1154 taskFunctionWorkerUsage
,
1157 updateEluWorkerUsage(
1158 this.workerChoiceStrategyContext
,
1159 taskFunctionWorkerUsage
,
1162 needWorkerChoiceStrategyUpdate
= true
1164 if (needWorkerChoiceStrategyUpdate
) {
1165 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1170 * Whether the worker node shall update its task function worker usage or not.
1172 * @param workerNodeKey - The worker node key.
1173 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1175 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1176 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1178 workerInfo
!= null &&
1179 Array.isArray(workerInfo
.taskFunctionNames
) &&
1180 workerInfo
.taskFunctionNames
.length
> 2
1185 * Chooses a worker node for the next task.
1187 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1189 * @returns The chosen worker node key
1191 private chooseWorkerNode (): number {
1192 if (this.shallCreateDynamicWorker()) {
1193 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1195 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1196 .dynamicWorkerUsage
=== true
1198 return workerNodeKey
1201 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1202 return this.workerChoiceStrategyContext
!.execute()
1206 * Conditions for dynamic worker creation.
1208 * @returns Whether to create a dynamic worker or not.
1210 protected abstract shallCreateDynamicWorker (): boolean
1213 * Sends a message to worker given its worker node key.
1215 * @param workerNodeKey - The worker node key.
1216 * @param message - The message.
1217 * @param transferList - The optional array of transferable objects.
1219 protected abstract sendToWorker (
1220 workerNodeKey
: number,
1221 message
: MessageValue
<Data
>,
1222 transferList
?: TransferListItem
[]
1226 * Creates a new, completely set up worker node.
1228 * @returns New, completely set up worker node key.
1230 protected createAndSetupWorkerNode (): number {
1231 const workerNode
= this.createWorkerNode()
1232 workerNode
.registerWorkerEventHandler(
1234 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1236 workerNode
.registerWorkerEventHandler(
1238 this.opts
.messageHandler
?? EMPTY_FUNCTION
1240 workerNode
.registerWorkerEventHandler(
1242 this.opts
.errorHandler
?? EMPTY_FUNCTION
1244 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1245 workerNode
.info
.ready
= false
1246 this.emitter
?.emit(PoolEvents
.error
, error
)
1250 this.opts
.restartWorkerOnError
=== true
1252 if (workerNode
.info
.dynamic
) {
1253 this.createAndSetupDynamicWorkerNode()
1255 this.createAndSetupWorkerNode()
1261 this.opts
.enableTasksQueue
=== true
1263 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1265 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1266 workerNode
?.terminate().catch((error
: unknown
) => {
1267 this.emitter
?.emit(PoolEvents
.error
, error
)
1270 workerNode
.registerWorkerEventHandler(
1272 this.opts
.exitHandler
?? EMPTY_FUNCTION
1274 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1275 this.removeWorkerNode(workerNode
)
1276 if (this.started
&& !this.destroying
) {
1277 this.startMinimumNumberOfWorkers()
1280 const workerNodeKey
= this.addWorkerNode(workerNode
)
1281 this.afterWorkerNodeSetup(workerNodeKey
)
1282 return workerNodeKey
1286 * Creates a new, completely set up dynamic worker node.
1288 * @returns New, completely set up dynamic worker node key.
1290 protected createAndSetupDynamicWorkerNode (): number {
1291 const workerNodeKey
= this.createAndSetupWorkerNode()
1292 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1293 this.checkMessageWorkerId(message
)
1294 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1297 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1298 // Kill message received from worker
1300 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1301 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1302 ((this.opts
.enableTasksQueue
=== false &&
1303 workerUsage
.tasks
.executing
=== 0) ||
1304 (this.opts
.enableTasksQueue
=== true &&
1305 workerUsage
.tasks
.executing
=== 0 &&
1306 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1308 // Flag the worker node as not ready immediately
1309 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1310 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1311 this.emitter
?.emit(PoolEvents
.error
, error
)
1315 this.sendToWorker(workerNodeKey
, {
1318 if (this.taskFunctions
.size
> 0) {
1319 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1320 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1321 taskFunctionOperation
: 'add',
1323 taskFunction
: taskFunction
.toString()
1324 }).catch((error
: unknown
) => {
1325 this.emitter
?.emit(PoolEvents
.error
, error
)
1329 const workerNode
= this.workerNodes
[workerNodeKey
]
1330 workerNode
.info
.dynamic
= true
1332 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1333 .dynamicWorkerReady
=== true ||
1334 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1335 .dynamicWorkerUsage
=== true
1337 workerNode
.info
.ready
= true
1339 this.checkAndEmitDynamicWorkerCreationEvents()
1340 return workerNodeKey
1344 * Registers a listener callback on the worker given its worker node key.
1346 * @param workerNodeKey - The worker node key.
1347 * @param listener - The message listener callback.
1349 protected abstract registerWorkerMessageListener
<
1350 Message
extends Data
| Response
1352 workerNodeKey
: number,
1353 listener
: (message
: MessageValue
<Message
>) => void
1357 * Registers once a listener callback on the worker given its worker node key.
1359 * @param workerNodeKey - The worker node key.
1360 * @param listener - The message listener callback.
1362 protected abstract registerOnceWorkerMessageListener
<
1363 Message
extends Data
| Response
1365 workerNodeKey
: number,
1366 listener
: (message
: MessageValue
<Message
>) => void
1370 * Deregisters a listener callback on the worker given its worker node key.
1372 * @param workerNodeKey - The worker node key.
1373 * @param listener - The message listener callback.
1375 protected abstract deregisterWorkerMessageListener
<
1376 Message
extends Data
| Response
1378 workerNodeKey
: number,
1379 listener
: (message
: MessageValue
<Message
>) => void
1383 * Method hooked up after a worker node has been newly created.
1384 * Can be overridden.
1386 * @param workerNodeKey - The newly created worker node key.
1388 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1389 // Listen to worker messages.
1390 this.registerWorkerMessageListener(
1392 this.workerMessageListener
1394 // Send the startup message to worker.
1395 this.sendStartupMessageToWorker(workerNodeKey
)
1396 // Send the statistics message to worker.
1397 this.sendStatisticsMessageToWorker(workerNodeKey
)
1398 if (this.opts
.enableTasksQueue
=== true) {
1399 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1400 this.workerNodes
[workerNodeKey
].on(
1402 this.handleWorkerNodeIdleEvent
1405 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1406 this.workerNodes
[workerNodeKey
].on(
1408 this.handleWorkerNodeBackPressureEvent
1415 * Sends the startup message to worker given its worker node key.
1417 * @param workerNodeKey - The worker node key.
1419 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1422 * Sends the statistics message to worker given its worker node key.
1424 * @param workerNodeKey - The worker node key.
1426 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1427 this.sendToWorker(workerNodeKey
, {
1430 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1431 .runTime
.aggregate
?? false,
1433 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
1439 private cannotStealTask (): boolean {
1440 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1443 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1444 if (this.shallExecuteTask(workerNodeKey
)) {
1445 this.executeTask(workerNodeKey
, task
)
1447 this.enqueueTask(workerNodeKey
, task
)
1451 private redistributeQueuedTasks (workerNodeKey
: number): void {
1452 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1455 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1456 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1457 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1458 return workerNode
.info
.ready
&&
1459 workerNode
.usage
.tasks
.queued
<
1460 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1467 destinationWorkerNodeKey
,
1468 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1469 this.dequeueTask(workerNodeKey
)!
1474 private updateTaskStolenStatisticsWorkerUsage (
1475 workerNodeKey
: number,
1478 const workerNode
= this.workerNodes
[workerNodeKey
]
1479 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1480 if (workerNode
?.usage
!= null) {
1481 ++workerNode
.usage
.tasks
.stolen
1484 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1485 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1487 const taskFunctionWorkerUsage
=
1488 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1489 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1490 ++taskFunctionWorkerUsage
.tasks
.stolen
1494 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1495 workerNodeKey
: number
1497 const workerNode
= this.workerNodes
[workerNodeKey
]
1498 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1499 if (workerNode
?.usage
!= null) {
1500 ++workerNode
.usage
.tasks
.sequentiallyStolen
1504 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1505 workerNodeKey
: number,
1508 const workerNode
= this.workerNodes
[workerNodeKey
]
1510 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1511 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1513 const taskFunctionWorkerUsage
=
1514 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1515 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1516 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1520 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1521 workerNodeKey
: number
1523 const workerNode
= this.workerNodes
[workerNodeKey
]
1524 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1525 if (workerNode
?.usage
!= null) {
1526 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1530 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1531 workerNodeKey
: number,
1534 const workerNode
= this.workerNodes
[workerNodeKey
]
1536 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1537 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1539 const taskFunctionWorkerUsage
=
1540 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1541 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1542 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1546 private readonly handleWorkerNodeIdleEvent
= (
1547 eventDetail
: WorkerNodeEventDetail
,
1548 previousStolenTask
?: Task
<Data
>
1550 const { workerNodeKey
} = eventDetail
1551 if (workerNodeKey
== null) {
1553 "WorkerNode event detail 'workerNodeKey' property must be defined"
1556 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1558 this.cannotStealTask() ||
1559 (this.info
.stealingWorkerNodes
?? 0) >
1560 Math.floor(this.workerNodes
.length
/ 2)
1562 if (workerInfo
!= null && previousStolenTask
!= null) {
1563 workerInfo
.stealing
= false
1567 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1569 workerInfo
!= null &&
1570 previousStolenTask
!= null &&
1571 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1572 (workerNodeTasksUsage
.executing
> 0 ||
1573 this.tasksQueueSize(workerNodeKey
) > 0)
1575 workerInfo
.stealing
= false
1576 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1577 for (const taskName
of workerInfo
.taskFunctionNames
!) {
1578 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1583 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1586 if (workerInfo
== null) {
1588 `Worker node with key '${workerNodeKey}' not found in pool`
1591 workerInfo
.stealing
= true
1592 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1594 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1597 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1598 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1601 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1603 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1604 (previousStolenTask
!= null &&
1605 previousStolenTask
.name
=== stolenTask
.name
&&
1606 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1608 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1614 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1616 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1621 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1623 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1626 .catch((error
: unknown
) => {
1627 this.emitter
?.emit(PoolEvents
.error
, error
)
1631 private readonly workerNodeStealTask
= (
1632 workerNodeKey
: number
1633 ): Task
<Data
> | undefined => {
1634 const workerNodes
= this.workerNodes
1637 (workerNodeA
, workerNodeB
) =>
1638 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1640 const sourceWorkerNode
= workerNodes
.find(
1641 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1642 sourceWorkerNode
.info
.ready
&&
1643 !sourceWorkerNode
.info
.stealing
&&
1644 sourceWorkerNodeKey
!== workerNodeKey
&&
1645 sourceWorkerNode
.usage
.tasks
.queued
> 0
1647 if (sourceWorkerNode
!= null) {
1648 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1649 const task
= sourceWorkerNode
.popTask()!
1650 this.handleTask(workerNodeKey
, task
)
1651 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1652 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1653 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1658 private readonly handleWorkerNodeBackPressureEvent
= (
1659 eventDetail
: WorkerNodeEventDetail
1662 this.cannotStealTask() ||
1663 (this.info
.stealingWorkerNodes
?? 0) >
1664 Math.floor(this.workerNodes
.length
/ 2)
1668 const { workerId
} = eventDetail
1669 const sizeOffset
= 1
1670 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1671 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1674 const sourceWorkerNode
=
1675 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1676 const workerNodes
= this.workerNodes
1679 (workerNodeA
, workerNodeB
) =>
1680 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1682 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1684 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1685 workerNode
.info
.ready
&&
1686 !workerNode
.info
.stealing
&&
1687 workerNode
.info
.id
!== workerId
&&
1688 workerNode
.usage
.tasks
.queued
<
1689 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1690 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1692 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1693 if (workerInfo
== null) {
1695 `Worker node with key '${workerNodeKey}' not found in pool`
1698 workerInfo
.stealing
= true
1699 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1700 const task
= sourceWorkerNode
.popTask()!
1701 this.handleTask(workerNodeKey
, task
)
1702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1703 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1704 workerInfo
.stealing
= false
1710 * This method is the message listener registered on each worker.
1712 protected readonly workerMessageListener
= (
1713 message
: MessageValue
<Response
>
1715 this.checkMessageWorkerId(message
)
1716 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1717 if (ready
!= null && taskFunctionNames
!= null) {
1718 // Worker ready response received from worker
1719 this.handleWorkerReadyResponse(message
)
1720 } else if (taskId
!= null) {
1721 // Task execution response received from worker
1722 this.handleTaskExecutionResponse(message
)
1723 } else if (taskFunctionNames
!= null) {
1724 // Task function names message received from worker
1725 const workerInfo
= this.getWorkerInfo(
1726 this.getWorkerNodeKeyByWorkerId(workerId
)
1728 if (workerInfo
!= null) {
1729 workerInfo
.taskFunctionNames
= taskFunctionNames
1734 private checkAndEmitReadyEvent (): void {
1735 if (!this.readyEventEmitted
&& this.ready
) {
1736 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1737 this.readyEventEmitted
= true
1741 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1742 const { workerId
, ready
, taskFunctionNames
} = message
1743 if (ready
== null || !ready
) {
1744 throw new Error(`Worker ${workerId} failed to initialize`)
1747 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1748 workerNode
.info
.ready
= ready
1749 workerNode
.info
.taskFunctionNames
= taskFunctionNames
1750 this.checkAndEmitReadyEvent()
1753 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1754 const { workerId
, taskId
, workerError
, data
} = message
1755 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1756 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1757 if (promiseResponse
!= null) {
1758 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1759 const workerNode
= this.workerNodes
[workerNodeKey
]
1760 if (workerError
!= null) {
1761 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1762 asyncResource
!= null
1763 ? asyncResource
.runInAsyncScope(
1768 : reject(workerError
.message
)
1770 asyncResource
!= null
1771 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1772 : resolve(data
as Response
)
1774 asyncResource
?.emitDestroy()
1775 this.afterTaskExecutionHook(workerNodeKey
, message
)
1776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1777 this.promiseResponseMap
.delete(taskId
!)
1778 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1779 workerNode
?.emit('taskFinished', taskId
)
1781 this.opts
.enableTasksQueue
=== true &&
1783 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1786 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1788 this.tasksQueueSize(workerNodeKey
) > 0 &&
1789 workerNodeTasksUsage
.executing
<
1790 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1791 this.opts
.tasksQueueOptions
!.concurrency
!
1793 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1794 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1797 workerNodeTasksUsage
.executing
=== 0 &&
1798 this.tasksQueueSize(workerNodeKey
) === 0 &&
1799 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1801 workerNode
.emit('idle', {
1810 private checkAndEmitTaskExecutionEvents (): void {
1812 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1816 private checkAndEmitTaskQueuingEvents (): void {
1817 if (this.hasBackPressure()) {
1818 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1823 * Emits dynamic worker creation events.
1825 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1828 * Gets the worker information given its worker node key.
1830 * @param workerNodeKey - The worker node key.
1831 * @returns The worker information.
1833 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1834 return this.workerNodes
[workerNodeKey
]?.info
1838 * Creates a worker node.
1840 * @returns The created worker node.
1842 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1843 const workerNode
= new WorkerNode
<Worker
, Data
>(
1848 workerOptions
: this.opts
.workerOptions
,
1849 tasksQueueBackPressureSize
:
1850 this.opts
.tasksQueueOptions
?.size
??
1851 getDefaultTasksQueueOptions(
1852 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1856 // Flag the worker node as ready at pool startup.
1857 if (this.starting
) {
1858 workerNode
.info
.ready
= true
1864 * Adds the given worker node in the pool worker nodes.
1866 * @param workerNode - The worker node.
1867 * @returns The added worker node key.
1868 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1870 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1871 this.workerNodes
.push(workerNode
)
1872 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1873 if (workerNodeKey
=== -1) {
1874 throw new Error('Worker added not found in worker nodes')
1876 return workerNodeKey
1879 private checkAndEmitEmptyEvent (): void {
1881 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1882 this.readyEventEmitted
= false
1887 * Removes the worker node from the pool worker nodes.
1889 * @param workerNode - The worker node.
1891 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1892 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1893 if (workerNodeKey
!== -1) {
1894 this.workerNodes
.splice(workerNodeKey
, 1)
1895 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1897 this.checkAndEmitEmptyEvent()
1900 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1901 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1902 if (workerInfo
!= null) {
1903 workerInfo
.ready
= false
1907 private hasBackPressure (): boolean {
1909 this.opts
.enableTasksQueue
=== true &&
1910 this.workerNodes
.findIndex(
1911 workerNode
=> !workerNode
.hasBackPressure()
1917 * Executes the given task on the worker given its worker node key.
1919 * @param workerNodeKey - The worker node key.
1920 * @param task - The task to execute.
1922 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1923 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1924 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1925 this.checkAndEmitTaskExecutionEvents()
1928 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1929 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1930 this.checkAndEmitTaskQueuingEvents()
1931 return tasksQueueSize
1934 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1935 return this.workerNodes
[workerNodeKey
].dequeueTask()
1938 private tasksQueueSize (workerNodeKey
: number): number {
1939 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1942 protected flushTasksQueue (workerNodeKey
: number): number {
1943 let flushedTasks
= 0
1944 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1945 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1946 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1949 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1953 private flushTasksQueues (): void {
1954 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1955 this.flushTasksQueue(workerNodeKey
)