1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types.js'
24 import { KillBehaviors
} from
'../worker/worker-options.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
33 type TasksQueueOptions
39 WorkerNodeEventDetail
,
44 WorkerChoiceStrategies
,
45 type WorkerChoiceStrategy
,
46 type WorkerChoiceStrategyOptions
47 } from
'./selection-strategies/selection-strategies-types.js'
48 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
49 import { version
} from
'./version.js'
50 import { WorkerNode
} from
'./worker-node.js'
53 checkValidTasksQueueOptions
,
54 checkValidWorkerChoiceStrategy
,
55 getDefaultTasksQueueOptions
,
57 updateRunTimeWorkerUsage
,
58 updateTaskStatisticsWorkerUsage
,
59 updateWaitTimeWorkerUsage
,
64 * Base class that implements some shared logic for all poolifier pools.
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 export abstract class AbstractPool
<
71 Worker
extends IWorker
,
74 > implements IPool
<Worker
, Data
, Response
> {
76 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
79 public emitter
?: EventEmitterAsyncResource
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
89 new Map
<string, PromiseResponseWrapper
<Response
>>()
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
105 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
108 * Whether the pool is started or not.
110 private started
: boolean
112 * Whether the pool is starting or not.
114 private starting
: boolean
116 * Whether the pool is destroying or not.
118 private destroying
: boolean
120 * Whether the pool ready event has been emitted or not.
122 private readyEventEmitted
: boolean
124 * The start timestamp of the pool.
126 private readonly startTimestamp
129 * Constructs a new poolifier pool.
131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
137 protected readonly minimumNumberOfWorkers
: number,
138 protected readonly filePath
: string,
139 protected readonly opts
: PoolOptions
<Worker
>,
140 protected readonly maximumNumberOfWorkers
?: number
142 if (!this.isMain()) {
144 'Cannot start a pool from a worker with the same type as the pool'
148 checkFilePath(this.filePath
)
149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
150 this.checkPoolOptions(this.opts
)
152 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
153 this.executeTask
= this.executeTask
.bind(this)
154 this.enqueueTask
= this.enqueueTask
.bind(this)
156 if (this.opts
.enableEvents
=== true) {
157 this.initializeEventEmitter()
159 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
165 this.opts
.workerChoiceStrategy
,
166 this.opts
.workerChoiceStrategyOptions
171 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
174 this.starting
= false
175 this.destroying
= false
176 this.readyEventEmitted
= false
177 if (this.opts
.startWorkers
=== true) {
181 this.startTimestamp
= performance
.now()
184 private checkPoolType (): void {
185 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
192 private checkMinimumNumberOfWorkers (
193 minimumNumberOfWorkers
: number | undefined
195 if (minimumNumberOfWorkers
== null) {
197 'Cannot instantiate a pool without specifying the number of workers'
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
201 'Cannot instantiate a pool with a non safe integer number of workers'
203 } else if (minimumNumberOfWorkers
< 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
207 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
212 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
213 if (isPlainObject(opts
)) {
214 this.opts
.startWorkers
= opts
.startWorkers
?? true
215 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
216 this.opts
.workerChoiceStrategy
=
217 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategyOptions(
219 opts
.workerChoiceStrategyOptions
221 if (opts
.workerChoiceStrategyOptions
!= null) {
222 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
224 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
225 this.opts
.enableEvents
= opts
.enableEvents
?? true
226 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
227 if (this.opts
.enableTasksQueue
) {
228 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
229 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
230 opts
.tasksQueueOptions
234 throw new TypeError('Invalid pool options: must be a plain object')
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
242 workerChoiceStrategyOptions
!= null &&
243 !isPlainObject(workerChoiceStrategyOptions
)
246 'Invalid worker choice strategy options: must be a plain object'
250 workerChoiceStrategyOptions
?.weights
!= null &&
251 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
252 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
255 'Invalid worker choice strategy options: must have a weight for each worker node'
259 workerChoiceStrategyOptions
?.measurement
!= null &&
260 !Object.values(Measurements
).includes(
261 workerChoiceStrategyOptions
.measurement
265 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
270 private initializeEventEmitter (): void {
271 this.emitter
= new EventEmitterAsyncResource({
272 name
: `poolifier:${this.type}-${this.worker}-pool`
277 public get
info (): PoolInfo
{
282 started
: this.started
,
284 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
285 strategy
: this.opts
.workerChoiceStrategy
!,
286 minSize
: this.minimumNumberOfWorkers
,
287 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
288 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
289 ?.runTime
.aggregate
=== true &&
290 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
291 .waitTime
.aggregate
&& {
292 utilization
: round(this.utilization
)
294 workerNodes
: this.workerNodes
.length
,
295 idleWorkerNodes
: this.workerNodes
.reduce(
296 (accumulator
, workerNode
) =>
297 workerNode
.usage
.tasks
.executing
=== 0
302 ...(this.opts
.enableTasksQueue
=== true && {
303 stealingWorkerNodes
: this.workerNodes
.reduce(
304 (accumulator
, workerNode
) =>
305 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
309 busyWorkerNodes
: this.workerNodes
.reduce(
310 (accumulator
, _workerNode
, workerNodeKey
) =>
311 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
314 executedTasks
: this.workerNodes
.reduce(
315 (accumulator
, workerNode
) =>
316 accumulator
+ workerNode
.usage
.tasks
.executed
,
319 executingTasks
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 accumulator
+ workerNode
.usage
.tasks
.executing
,
324 ...(this.opts
.enableTasksQueue
=== true && {
325 queuedTasks
: this.workerNodes
.reduce(
326 (accumulator
, workerNode
) =>
327 accumulator
+ workerNode
.usage
.tasks
.queued
,
331 ...(this.opts
.enableTasksQueue
=== true && {
332 maxQueuedTasks
: this.workerNodes
.reduce(
333 (accumulator
, workerNode
) =>
334 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
338 ...(this.opts
.enableTasksQueue
=== true && {
339 backPressure
: this.hasBackPressure()
341 ...(this.opts
.enableTasksQueue
=== true && {
342 stolenTasks
: this.workerNodes
.reduce(
343 (accumulator
, workerNode
) =>
344 accumulator
+ workerNode
.usage
.tasks
.stolen
,
348 failedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ workerNode
.usage
.tasks
.failed
,
353 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
354 ?.runTime
.aggregate
=== true && {
358 ...this.workerNodes
.map(
359 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
365 ...this.workerNodes
.map(
366 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
370 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 .runTime
.average
&& {
374 this.workerNodes
.reduce
<number[]>(
375 (accumulator
, workerNode
) =>
376 accumulator
.concat(workerNode
.usage
.runTime
.history
),
382 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
386 this.workerNodes
.reduce
<number[]>(
387 (accumulator
, workerNode
) =>
388 accumulator
.concat(workerNode
.usage
.runTime
.history
),
396 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
397 ?.waitTime
.aggregate
=== true && {
401 ...this.workerNodes
.map(
402 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
408 ...this.workerNodes
.map(
409 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
413 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
414 .waitTime
.average
&& {
417 this.workerNodes
.reduce
<number[]>(
418 (accumulator
, workerNode
) =>
419 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
425 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
426 .waitTime
.median
&& {
429 this.workerNodes
.reduce
<number[]>(
430 (accumulator
, workerNode
) =>
431 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
443 * The pool readiness boolean status.
445 private get
ready (): boolean {
450 this.workerNodes
.reduce(
451 (accumulator
, workerNode
) =>
452 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
456 ) >= this.minimumNumberOfWorkers
461 * The pool emptiness boolean status.
463 protected get
empty (): boolean {
464 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
468 * The approximate pool utilization.
470 * @returns The pool utilization.
472 private get
utilization (): number {
473 const poolTimeCapacity
=
474 (performance
.now() - this.startTimestamp
) *
475 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
476 const totalTasksRunTime
= this.workerNodes
.reduce(
477 (accumulator
, workerNode
) =>
478 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
481 const totalTasksWaitTime
= this.workerNodes
.reduce(
482 (accumulator
, workerNode
) =>
483 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
486 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
492 * If it is `'dynamic'`, it provides the `max` property.
494 protected abstract get
type (): PoolType
499 protected abstract get
worker (): WorkerType
502 * Checks if the worker id sent in the received message from a worker is valid.
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
508 if (message
.workerId
== null) {
509 throw new Error('Worker message received without worker id')
510 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
512 `Worker message received from unknown worker '${message.workerId}'`
518 * Gets the worker node key given its worker id.
520 * @param workerId - The worker id.
521 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
523 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
524 return this.workerNodes
.findIndex(
525 workerNode
=> workerNode
.info
.id
=== workerId
530 public setWorkerChoiceStrategy (
531 workerChoiceStrategy
: WorkerChoiceStrategy
,
532 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
534 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
535 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
536 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
537 this.opts
.workerChoiceStrategy
539 if (workerChoiceStrategyOptions
!= null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
542 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
543 workerNode
.resetUsage()
544 this.sendStatisticsMessageToWorker(workerNodeKey
)
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
553 if (workerChoiceStrategyOptions
!= null) {
554 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
556 this.workerChoiceStrategyContext
?.setOptions(
557 this.opts
.workerChoiceStrategyOptions
562 public enableTasksQueue (
564 tasksQueueOptions
?: TasksQueueOptions
566 if (this.opts
.enableTasksQueue
=== true && !enable
) {
567 this.unsetTaskStealing()
568 this.unsetTasksStealingOnBackPressure()
569 this.flushTasksQueues()
571 this.opts
.enableTasksQueue
= enable
572 this.setTasksQueueOptions(tasksQueueOptions
)
576 public setTasksQueueOptions (
577 tasksQueueOptions
: TasksQueueOptions
| undefined
579 if (this.opts
.enableTasksQueue
=== true) {
580 checkValidTasksQueueOptions(tasksQueueOptions
)
581 this.opts
.tasksQueueOptions
=
582 this.buildTasksQueueOptions(tasksQueueOptions
)
583 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
584 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
585 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
586 this.unsetTaskStealing()
587 this.setTaskStealing()
589 this.unsetTaskStealing()
591 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
592 this.unsetTasksStealingOnBackPressure()
593 this.setTasksStealingOnBackPressure()
595 this.unsetTasksStealingOnBackPressure()
597 } else if (this.opts
.tasksQueueOptions
!= null) {
598 delete this.opts
.tasksQueueOptions
602 private buildTasksQueueOptions (
603 tasksQueueOptions
: TasksQueueOptions
| undefined
604 ): TasksQueueOptions
{
606 ...getDefaultTasksQueueOptions(
607 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
613 private setTasksQueueSize (size
: number): void {
614 for (const workerNode
of this.workerNodes
) {
615 workerNode
.tasksQueueBackPressureSize
= size
619 private setTaskStealing (): void {
620 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
621 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
625 private unsetTaskStealing (): void {
626 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
627 this.workerNodes
[workerNodeKey
].off(
629 this.handleWorkerNodeIdleEvent
634 private setTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
636 this.workerNodes
[workerNodeKey
].on(
638 this.handleWorkerNodeBackPressureEvent
643 private unsetTasksStealingOnBackPressure (): void {
644 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
645 this.workerNodes
[workerNodeKey
].off(
647 this.handleWorkerNodeBackPressureEvent
653 * Whether the pool is full or not.
655 * The pool filling boolean status.
657 protected get
full (): boolean {
659 this.workerNodes
.length
>=
660 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
665 * Whether the pool is busy or not.
667 * The pool busyness boolean status.
669 protected abstract get
busy (): boolean
672 * Whether worker nodes are executing concurrently their tasks quota or not.
674 * @returns Worker nodes busyness boolean status.
676 protected internalBusy (): boolean {
677 if (this.opts
.enableTasksQueue
=== true) {
679 this.workerNodes
.findIndex(
681 workerNode
.info
.ready
&&
682 workerNode
.usage
.tasks
.executing
<
683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
684 this.opts
.tasksQueueOptions
!.concurrency
!
689 this.workerNodes
.findIndex(
691 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
696 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
697 if (this.opts
.enableTasksQueue
=== true) {
699 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
700 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
701 this.opts
.tasksQueueOptions
!.concurrency
!
704 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey
: number,
709 message
: MessageValue
<Data
>
710 ): Promise
<boolean> {
711 return await new Promise
<boolean>((resolve
, reject
) => {
712 const taskFunctionOperationListener
= (
713 message
: MessageValue
<Response
>
715 this.checkMessageWorkerId(message
)
716 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
718 message
.taskFunctionOperationStatus
!= null &&
719 message
.workerId
=== workerId
721 if (message
.taskFunctionOperationStatus
) {
726 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
730 this.deregisterWorkerMessageListener(
731 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
732 taskFunctionOperationListener
736 this.registerWorkerMessageListener(
738 taskFunctionOperationListener
740 this.sendToWorker(workerNodeKey
, message
)
744 private async sendTaskFunctionOperationToWorkers (
745 message
: MessageValue
<Data
>
746 ): Promise
<boolean> {
747 return await new Promise
<boolean>((resolve
, reject
) => {
748 const responsesReceived
= new Array<MessageValue
<Response
>>()
749 const taskFunctionOperationsListener
= (
750 message
: MessageValue
<Response
>
752 this.checkMessageWorkerId(message
)
753 if (message
.taskFunctionOperationStatus
!= null) {
754 responsesReceived
.push(message
)
755 if (responsesReceived
.length
=== this.workerNodes
.length
) {
757 responsesReceived
.every(
758 message
=> message
.taskFunctionOperationStatus
=== true
763 responsesReceived
.some(
764 message
=> message
.taskFunctionOperationStatus
=== false
767 const errorResponse
= responsesReceived
.find(
768 response
=> response
.taskFunctionOperationStatus
=== false
772 `Task function operation '${
773 message.taskFunctionOperation as string
774 }' failed on worker ${errorResponse?.workerId} with error: '${
775 errorResponse?.workerError?.message
780 this.deregisterWorkerMessageListener(
781 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
782 taskFunctionOperationsListener
787 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
788 this.registerWorkerMessageListener(
790 taskFunctionOperationsListener
792 this.sendToWorker(workerNodeKey
, message
)
798 public hasTaskFunction (name
: string): boolean {
799 for (const workerNode
of this.workerNodes
) {
801 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
802 workerNode
.info
.taskFunctionNames
.includes(name
)
811 public async addTaskFunction (
813 fn
: TaskFunction
<Data
, Response
>
814 ): Promise
<boolean> {
815 if (typeof name
!== 'string') {
816 throw new TypeError('name argument must be a string')
818 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
819 throw new TypeError('name argument must not be an empty string')
821 if (typeof fn
!== 'function') {
822 throw new TypeError('fn argument must be a function')
824 const opResult
= await this.sendTaskFunctionOperationToWorkers({
825 taskFunctionOperation
: 'add',
826 taskFunctionName
: name
,
827 taskFunction
: fn
.toString()
829 this.taskFunctions
.set(name
, fn
)
834 public async removeTaskFunction (name
: string): Promise
<boolean> {
835 if (!this.taskFunctions
.has(name
)) {
837 'Cannot remove a task function not handled on the pool side'
840 const opResult
= await this.sendTaskFunctionOperationToWorkers({
841 taskFunctionOperation
: 'remove',
842 taskFunctionName
: name
844 this.deleteTaskFunctionWorkerUsages(name
)
845 this.taskFunctions
.delete(name
)
850 public listTaskFunctionNames (): string[] {
851 for (const workerNode
of this.workerNodes
) {
853 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
854 workerNode
.info
.taskFunctionNames
.length
> 0
856 return workerNode
.info
.taskFunctionNames
863 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
864 return await this.sendTaskFunctionOperationToWorkers({
865 taskFunctionOperation
: 'default',
866 taskFunctionName
: name
870 private deleteTaskFunctionWorkerUsages (name
: string): void {
871 for (const workerNode
of this.workerNodes
) {
872 workerNode
.deleteTaskFunctionWorkerUsage(name
)
876 private shallExecuteTask (workerNodeKey
: number): boolean {
878 this.tasksQueueSize(workerNodeKey
) === 0 &&
879 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
880 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
881 this.opts
.tasksQueueOptions
!.concurrency
!
886 public async execute (
889 transferList
?: TransferListItem
[]
890 ): Promise
<Response
> {
891 return await new Promise
<Response
>((resolve
, reject
) => {
893 reject(new Error('Cannot execute a task on not started pool'))
896 if (this.destroying
) {
897 reject(new Error('Cannot execute a task on destroying pool'))
900 if (name
!= null && typeof name
!== 'string') {
901 reject(new TypeError('name argument must be a string'))
906 typeof name
=== 'string' &&
907 name
.trim().length
=== 0
909 reject(new TypeError('name argument must not be an empty string'))
912 if (transferList
!= null && !Array.isArray(transferList
)) {
913 reject(new TypeError('transferList argument must be an array'))
916 const timestamp
= performance
.now()
917 const workerNodeKey
= this.chooseWorkerNode()
918 const task
: Task
<Data
> = {
919 name
: name
?? DEFAULT_TASK_NAME
,
920 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
921 data
: data
?? ({} as Data
),
926 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
927 this.promiseResponseMap
.set(task
.taskId
!, {
931 ...(this.emitter
!= null && {
932 asyncResource
: new AsyncResource('poolifier:task', {
933 triggerAsyncId
: this.emitter
.asyncId
,
934 requireManualDestroy
: true
939 this.opts
.enableTasksQueue
=== false ||
940 (this.opts
.enableTasksQueue
=== true &&
941 this.shallExecuteTask(workerNodeKey
))
943 this.executeTask(workerNodeKey
, task
)
945 this.enqueueTask(workerNodeKey
, task
)
951 public start (): void {
953 throw new Error('Cannot start an already started pool')
956 throw new Error('Cannot start an already starting pool')
958 if (this.destroying
) {
959 throw new Error('Cannot start a destroying pool')
963 this.workerNodes
.reduce(
964 (accumulator
, workerNode
) =>
965 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
967 ) < this.minimumNumberOfWorkers
969 this.createAndSetupWorkerNode()
971 this.starting
= false
976 public async destroy (): Promise
<void> {
978 throw new Error('Cannot destroy an already destroyed pool')
981 throw new Error('Cannot destroy an starting pool')
983 if (this.destroying
) {
984 throw new Error('Cannot destroy an already destroying pool')
986 this.destroying
= true
988 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
989 await this.destroyWorkerNode(workerNodeKey
)
992 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
993 this.emitter
?.emitDestroy()
994 this.emitter
?.removeAllListeners()
995 this.readyEventEmitted
= false
996 this.destroying
= false
1000 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1001 await new Promise
<void>((resolve
, reject
) => {
1002 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1003 if (this.workerNodes
[workerNodeKey
] == null) {
1007 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1008 this.checkMessageWorkerId(message
)
1009 if (message
.kill
=== 'success') {
1011 } else if (message
.kill
=== 'failure') {
1014 `Kill message handling failed on worker ${message.workerId}`
1019 // FIXME: should be registered only once
1020 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1021 this.sendToWorker(workerNodeKey
, { kill
: true })
1026 * Terminates the worker node given its worker node key.
1028 * @param workerNodeKey - The worker node key.
1030 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1031 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1032 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1033 const workerNode
= this.workerNodes
[workerNodeKey
]
1034 await waitWorkerNodeEvents(
1038 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1039 getDefaultTasksQueueOptions(
1040 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1041 ).tasksFinishedTimeout
1043 await this.sendKillMessageToWorker(workerNodeKey
)
1044 await workerNode
.terminate()
1048 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1049 * Can be overridden.
1053 protected setupHook (): void {
1054 /* Intentionally empty */
1058 * Should return whether the worker is the main worker or not.
1060 protected abstract isMain (): boolean
1063 * Hook executed before the worker task execution.
1064 * Can be overridden.
1066 * @param workerNodeKey - The worker node key.
1067 * @param task - The task to execute.
1069 protected beforeTaskExecutionHook (
1070 workerNodeKey
: number,
1073 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1074 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1075 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1076 ++workerUsage
.tasks
.executing
1077 updateWaitTimeWorkerUsage(
1078 this.workerChoiceStrategyContext
,
1084 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1085 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1086 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1089 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1090 const taskFunctionWorkerUsage
= this.workerNodes
[
1092 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1093 ].getTaskFunctionWorkerUsage(task
.name
!)!
1094 ++taskFunctionWorkerUsage
.tasks
.executing
1095 updateWaitTimeWorkerUsage(
1096 this.workerChoiceStrategyContext
,
1097 taskFunctionWorkerUsage
,
1104 * Hook executed after the worker task execution.
1105 * Can be overridden.
1107 * @param workerNodeKey - The worker node key.
1108 * @param message - The received message.
1110 protected afterTaskExecutionHook (
1111 workerNodeKey
: number,
1112 message
: MessageValue
<Response
>
1114 let needWorkerChoiceStrategyUpdate
= false
1115 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1116 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1117 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1118 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1119 updateRunTimeWorkerUsage(
1120 this.workerChoiceStrategyContext
,
1124 updateEluWorkerUsage(
1125 this.workerChoiceStrategyContext
,
1129 needWorkerChoiceStrategyUpdate
= true
1132 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1133 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1134 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1135 message
.taskPerformance
!.name
1138 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1139 const taskFunctionWorkerUsage
= this.workerNodes
[
1141 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1142 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1143 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1144 updateRunTimeWorkerUsage(
1145 this.workerChoiceStrategyContext
,
1146 taskFunctionWorkerUsage
,
1149 updateEluWorkerUsage(
1150 this.workerChoiceStrategyContext
,
1151 taskFunctionWorkerUsage
,
1154 needWorkerChoiceStrategyUpdate
= true
1156 if (needWorkerChoiceStrategyUpdate
) {
1157 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1162 * Whether the worker node shall update its task function worker usage or not.
1164 * @param workerNodeKey - The worker node key.
1165 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1167 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1168 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1170 workerInfo
!= null &&
1171 Array.isArray(workerInfo
.taskFunctionNames
) &&
1172 workerInfo
.taskFunctionNames
.length
> 2
1177 * Chooses a worker node for the next task.
1179 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1181 * @returns The chosen worker node key
1183 private chooseWorkerNode (): number {
1184 if (this.shallCreateDynamicWorker()) {
1185 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1187 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1188 .dynamicWorkerUsage
=== true
1190 return workerNodeKey
1193 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1194 return this.workerChoiceStrategyContext
!.execute()
1198 * Conditions for dynamic worker creation.
1200 * @returns Whether to create a dynamic worker or not.
1202 protected abstract shallCreateDynamicWorker (): boolean
1205 * Sends a message to worker given its worker node key.
1207 * @param workerNodeKey - The worker node key.
1208 * @param message - The message.
1209 * @param transferList - The optional array of transferable objects.
1211 protected abstract sendToWorker (
1212 workerNodeKey
: number,
1213 message
: MessageValue
<Data
>,
1214 transferList
?: TransferListItem
[]
1218 * Creates a new, completely set up worker node.
1220 * @returns New, completely set up worker node key.
1222 protected createAndSetupWorkerNode (): number {
1223 const workerNode
= this.createWorkerNode()
1224 workerNode
.registerWorkerEventHandler(
1226 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1228 workerNode
.registerWorkerEventHandler(
1230 this.opts
.messageHandler
?? EMPTY_FUNCTION
1232 workerNode
.registerWorkerEventHandler(
1234 this.opts
.errorHandler
?? EMPTY_FUNCTION
1236 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1237 workerNode
.info
.ready
= false
1238 this.emitter
?.emit(PoolEvents
.error
, error
)
1242 this.opts
.restartWorkerOnError
=== true
1244 if (workerNode
.info
.dynamic
) {
1245 this.createAndSetupDynamicWorkerNode()
1247 this.createAndSetupWorkerNode()
1253 this.opts
.enableTasksQueue
=== true
1255 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1257 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1258 workerNode
?.terminate().catch(error
=> {
1259 this.emitter
?.emit(PoolEvents
.error
, error
)
1262 workerNode
.registerWorkerEventHandler(
1264 this.opts
.exitHandler
?? EMPTY_FUNCTION
1266 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1267 this.removeWorkerNode(workerNode
)
1269 const workerNodeKey
= this.addWorkerNode(workerNode
)
1270 this.afterWorkerNodeSetup(workerNodeKey
)
1271 return workerNodeKey
1275 * Creates a new, completely set up dynamic worker node.
1277 * @returns New, completely set up dynamic worker node key.
1279 protected createAndSetupDynamicWorkerNode (): number {
1280 const workerNodeKey
= this.createAndSetupWorkerNode()
1281 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1282 this.checkMessageWorkerId(message
)
1283 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1286 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1287 // Kill message received from worker
1289 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1290 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1291 ((this.opts
.enableTasksQueue
=== false &&
1292 workerUsage
.tasks
.executing
=== 0) ||
1293 (this.opts
.enableTasksQueue
=== true &&
1294 workerUsage
.tasks
.executing
=== 0 &&
1295 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1297 // Flag the worker node as not ready immediately
1298 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1299 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1300 this.emitter
?.emit(PoolEvents
.error
, error
)
1304 this.sendToWorker(workerNodeKey
, {
1307 if (this.taskFunctions
.size
> 0) {
1308 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1309 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1310 taskFunctionOperation
: 'add',
1312 taskFunction
: taskFunction
.toString()
1314 this.emitter
?.emit(PoolEvents
.error
, error
)
1318 const workerNode
= this.workerNodes
[workerNodeKey
]
1319 workerNode
.info
.dynamic
= true
1321 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1322 .dynamicWorkerReady
=== true ||
1323 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1324 .dynamicWorkerUsage
=== true
1326 workerNode
.info
.ready
= true
1328 this.checkAndEmitDynamicWorkerCreationEvents()
1329 return workerNodeKey
1333 * Registers a listener callback on the worker given its worker node key.
1335 * @param workerNodeKey - The worker node key.
1336 * @param listener - The message listener callback.
1338 protected abstract registerWorkerMessageListener
<
1339 Message
extends Data
| Response
1341 workerNodeKey
: number,
1342 listener
: (message
: MessageValue
<Message
>) => void
1346 * Registers once a listener callback on the worker given its worker node key.
1348 * @param workerNodeKey - The worker node key.
1349 * @param listener - The message listener callback.
1351 protected abstract registerOnceWorkerMessageListener
<
1352 Message
extends Data
| Response
1354 workerNodeKey
: number,
1355 listener
: (message
: MessageValue
<Message
>) => void
1359 * Deregisters a listener callback on the worker given its worker node key.
1361 * @param workerNodeKey - The worker node key.
1362 * @param listener - The message listener callback.
1364 protected abstract deregisterWorkerMessageListener
<
1365 Message
extends Data
| Response
1367 workerNodeKey
: number,
1368 listener
: (message
: MessageValue
<Message
>) => void
1372 * Method hooked up after a worker node has been newly created.
1373 * Can be overridden.
1375 * @param workerNodeKey - The newly created worker node key.
1377 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1378 // Listen to worker messages.
1379 this.registerWorkerMessageListener(
1381 this.workerMessageListener
1383 // Send the startup message to worker.
1384 this.sendStartupMessageToWorker(workerNodeKey
)
1385 // Send the statistics message to worker.
1386 this.sendStatisticsMessageToWorker(workerNodeKey
)
1387 if (this.opts
.enableTasksQueue
=== true) {
1388 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1389 this.workerNodes
[workerNodeKey
].on(
1391 this.handleWorkerNodeIdleEvent
1394 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1395 this.workerNodes
[workerNodeKey
].on(
1397 this.handleWorkerNodeBackPressureEvent
1404 * Sends the startup message to worker given its worker node key.
1406 * @param workerNodeKey - The worker node key.
1408 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1411 * Sends the statistics message to worker given its worker node key.
1413 * @param workerNodeKey - The worker node key.
1415 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1416 this.sendToWorker(workerNodeKey
, {
1419 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1420 ?.runTime
.aggregate
?? false,
1422 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()?.elu
1428 private cannotStealTask (): boolean {
1429 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1432 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1433 if (this.shallExecuteTask(workerNodeKey
)) {
1434 this.executeTask(workerNodeKey
, task
)
1436 this.enqueueTask(workerNodeKey
, task
)
1440 private redistributeQueuedTasks (workerNodeKey
: number): void {
1441 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1444 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1445 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1446 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1447 return workerNode
.info
.ready
&&
1448 workerNode
.usage
.tasks
.queued
<
1449 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1456 destinationWorkerNodeKey
,
1457 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1458 this.dequeueTask(workerNodeKey
)!
1463 private updateTaskStolenStatisticsWorkerUsage (
1464 workerNodeKey
: number,
1467 const workerNode
= this.workerNodes
[workerNodeKey
]
1468 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1469 if (workerNode
?.usage
!= null) {
1470 ++workerNode
.usage
.tasks
.stolen
1473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1474 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1476 const taskFunctionWorkerUsage
=
1477 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1478 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1479 ++taskFunctionWorkerUsage
.tasks
.stolen
1483 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1484 workerNodeKey
: number
1486 const workerNode
= this.workerNodes
[workerNodeKey
]
1487 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1488 if (workerNode
?.usage
!= null) {
1489 ++workerNode
.usage
.tasks
.sequentiallyStolen
1493 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1494 workerNodeKey
: number,
1497 const workerNode
= this.workerNodes
[workerNodeKey
]
1499 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1500 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1502 const taskFunctionWorkerUsage
=
1503 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1504 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1505 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1509 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1510 workerNodeKey
: number
1512 const workerNode
= this.workerNodes
[workerNodeKey
]
1513 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1514 if (workerNode
?.usage
!= null) {
1515 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1519 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1520 workerNodeKey
: number,
1523 const workerNode
= this.workerNodes
[workerNodeKey
]
1525 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1526 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1528 const taskFunctionWorkerUsage
=
1529 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1530 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1531 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1535 private readonly handleWorkerNodeIdleEvent
= (
1536 eventDetail
: WorkerNodeEventDetail
,
1537 previousStolenTask
?: Task
<Data
>
1539 const { workerNodeKey
} = eventDetail
1540 if (workerNodeKey
== null) {
1542 "WorkerNode event detail 'workerNodeKey' property must be defined"
1545 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1547 this.cannotStealTask() ||
1548 (this.info
.stealingWorkerNodes
?? 0) >
1549 Math.floor(this.workerNodes
.length
/ 2)
1551 if (workerInfo
!= null && previousStolenTask
!= null) {
1552 workerInfo
.stealing
= false
1556 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1558 workerInfo
!= null &&
1559 previousStolenTask
!= null &&
1560 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1561 (workerNodeTasksUsage
.executing
> 0 ||
1562 this.tasksQueueSize(workerNodeKey
) > 0)
1564 workerInfo
.stealing
= false
1565 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1566 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1567 .taskFunctionNames
!) {
1568 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1573 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1576 if (workerInfo
== null) {
1578 `Worker node with key '${workerNodeKey}' not found in pool`
1581 workerInfo
.stealing
= true
1582 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1584 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1587 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1588 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1591 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1593 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1594 (previousStolenTask
!= null &&
1595 previousStolenTask
.name
=== stolenTask
.name
&&
1596 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1598 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1604 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1606 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1613 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1617 this.emitter
?.emit(PoolEvents
.error
, error
)
1621 private readonly workerNodeStealTask
= (
1622 workerNodeKey
: number
1623 ): Task
<Data
> | undefined => {
1624 const workerNodes
= this.workerNodes
1627 (workerNodeA
, workerNodeB
) =>
1628 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1630 const sourceWorkerNode
= workerNodes
.find(
1631 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1632 sourceWorkerNode
.info
.ready
&&
1633 !sourceWorkerNode
.info
.stealing
&&
1634 sourceWorkerNodeKey
!== workerNodeKey
&&
1635 sourceWorkerNode
.usage
.tasks
.queued
> 0
1637 if (sourceWorkerNode
!= null) {
1638 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1639 const task
= sourceWorkerNode
.popTask()!
1640 this.handleTask(workerNodeKey
, task
)
1641 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1642 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1643 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1648 private readonly handleWorkerNodeBackPressureEvent
= (
1649 eventDetail
: WorkerNodeEventDetail
1652 this.cannotStealTask() ||
1653 (this.info
.stealingWorkerNodes
?? 0) >
1654 Math.floor(this.workerNodes
.length
/ 2)
1658 const { workerId
} = eventDetail
1659 const sizeOffset
= 1
1660 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1661 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1664 const sourceWorkerNode
=
1665 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1666 const workerNodes
= this.workerNodes
1669 (workerNodeA
, workerNodeB
) =>
1670 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1672 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1674 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1675 workerNode
.info
.ready
&&
1676 !workerNode
.info
.stealing
&&
1677 workerNode
.info
.id
!== workerId
&&
1678 workerNode
.usage
.tasks
.queued
<
1679 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1680 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1682 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1683 if (workerInfo
== null) {
1685 `Worker node with key '${workerNodeKey}' not found in pool`
1688 workerInfo
.stealing
= true
1689 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1690 const task
= sourceWorkerNode
.popTask()!
1691 this.handleTask(workerNodeKey
, task
)
1692 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1693 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1694 workerInfo
.stealing
= false
1700 * This method is the message listener registered on each worker.
1702 protected readonly workerMessageListener
= (
1703 message
: MessageValue
<Response
>
1705 this.checkMessageWorkerId(message
)
1706 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1707 if (ready
!= null && taskFunctionNames
!= null) {
1708 // Worker ready response received from worker
1709 this.handleWorkerReadyResponse(message
)
1710 } else if (taskId
!= null) {
1711 // Task execution response received from worker
1712 this.handleTaskExecutionResponse(message
)
1713 } else if (taskFunctionNames
!= null) {
1714 // Task function names message received from worker
1715 const workerInfo
= this.getWorkerInfo(
1716 this.getWorkerNodeKeyByWorkerId(workerId
)
1718 if (workerInfo
!= null) {
1719 workerInfo
.taskFunctionNames
= taskFunctionNames
1724 private checkAndEmitReadyEvent (): void {
1725 if (!this.readyEventEmitted
&& this.ready
) {
1726 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1727 this.readyEventEmitted
= true
1731 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1732 const { workerId
, ready
, taskFunctionNames
} = message
1733 if (ready
== null || !ready
) {
1734 throw new Error(`Worker ${workerId} failed to initialize`)
1737 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1738 workerNode
.info
.ready
= ready
1739 workerNode
.info
.taskFunctionNames
= taskFunctionNames
1740 this.checkAndEmitReadyEvent()
1743 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1744 const { workerId
, taskId
, workerError
, data
} = message
1745 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1746 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1747 if (promiseResponse
!= null) {
1748 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1749 const workerNode
= this.workerNodes
[workerNodeKey
]
1750 if (workerError
!= null) {
1751 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1752 asyncResource
!= null
1753 ? asyncResource
.runInAsyncScope(
1758 : reject(workerError
.message
)
1760 asyncResource
!= null
1761 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1762 : resolve(data
as Response
)
1764 asyncResource
?.emitDestroy()
1765 this.afterTaskExecutionHook(workerNodeKey
, message
)
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 this.promiseResponseMap
.delete(taskId
!)
1768 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1769 workerNode
?.emit('taskFinished', taskId
)
1770 if (this.opts
.enableTasksQueue
=== true && !this.destroying
) {
1771 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1773 this.tasksQueueSize(workerNodeKey
) > 0 &&
1774 workerNodeTasksUsage
.executing
<
1775 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1776 this.opts
.tasksQueueOptions
!.concurrency
!
1778 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1779 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1782 workerNodeTasksUsage
.executing
=== 0 &&
1783 this.tasksQueueSize(workerNodeKey
) === 0 &&
1784 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1786 workerNode
.emit('idle', {
1795 private checkAndEmitTaskExecutionEvents (): void {
1797 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1801 private checkAndEmitTaskQueuingEvents (): void {
1802 if (this.hasBackPressure()) {
1803 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1808 * Emits dynamic worker creation events.
1810 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1813 * Gets the worker information given its worker node key.
1815 * @param workerNodeKey - The worker node key.
1816 * @returns The worker information.
1818 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1819 return this.workerNodes
[workerNodeKey
]?.info
1823 * Creates a worker node.
1825 * @returns The created worker node.
1827 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1828 const workerNode
= new WorkerNode
<Worker
, Data
>(
1833 workerOptions
: this.opts
.workerOptions
,
1834 tasksQueueBackPressureSize
:
1835 this.opts
.tasksQueueOptions
?.size
??
1836 getDefaultTasksQueueOptions(
1837 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1841 // Flag the worker node as ready at pool startup.
1842 if (this.starting
) {
1843 workerNode
.info
.ready
= true
1849 * Adds the given worker node in the pool worker nodes.
1851 * @param workerNode - The worker node.
1852 * @returns The added worker node key.
1853 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1855 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1856 this.workerNodes
.push(workerNode
)
1857 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1858 if (workerNodeKey
=== -1) {
1859 throw new Error('Worker added not found in worker nodes')
1861 return workerNodeKey
1864 private checkAndEmitEmptyEvent (): void {
1866 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1867 this.readyEventEmitted
= false
1872 * Removes the worker node from the pool worker nodes.
1874 * @param workerNode - The worker node.
1876 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1877 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1878 if (workerNodeKey
!== -1) {
1879 this.workerNodes
.splice(workerNodeKey
, 1)
1880 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1882 this.checkAndEmitEmptyEvent()
1885 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1886 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1887 if (workerInfo
!= null) {
1888 workerInfo
.ready
= false
1892 private hasBackPressure (): boolean {
1894 this.opts
.enableTasksQueue
=== true &&
1895 this.workerNodes
.findIndex(
1896 workerNode
=> !workerNode
.hasBackPressure()
1902 * Executes the given task on the worker given its worker node key.
1904 * @param workerNodeKey - The worker node key.
1905 * @param task - The task to execute.
1907 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1908 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1909 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1910 this.checkAndEmitTaskExecutionEvents()
1913 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1914 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1915 this.checkAndEmitTaskQueuingEvents()
1916 return tasksQueueSize
1919 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1920 return this.workerNodes
[workerNodeKey
].dequeueTask()
1923 private tasksQueueSize (workerNodeKey
: number): number {
1924 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1927 protected flushTasksQueue (workerNodeKey
: number): number {
1928 let flushedTasks
= 0
1929 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1930 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1931 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1934 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1938 private flushTasksQueues (): void {
1939 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1940 this.flushTasksQueue(workerNodeKey
)