1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types.js'
24 import { KillBehaviors
} from
'../worker/worker-options.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
33 type TasksQueueOptions
39 WorkerNodeEventDetail
,
44 WorkerChoiceStrategies
,
45 type WorkerChoiceStrategy
,
46 type WorkerChoiceStrategyOptions
47 } from
'./selection-strategies/selection-strategies-types.js'
48 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
49 import { version
} from
'./version.js'
50 import { WorkerNode
} from
'./worker-node.js'
53 checkValidTasksQueueOptions
,
54 checkValidWorkerChoiceStrategy
,
55 getDefaultTasksQueueOptions
,
57 updateRunTimeWorkerUsage
,
58 updateTaskStatisticsWorkerUsage
,
59 updateWaitTimeWorkerUsage
,
64 * Base class that implements some shared logic for all poolifier pools.
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 export abstract class AbstractPool
<
71 Worker
extends IWorker
,
74 > implements IPool
<Worker
, Data
, Response
> {
76 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
79 public emitter
?: EventEmitterAsyncResource
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
89 new Map
<string, PromiseResponseWrapper
<Response
>>()
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
105 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
108 * Whether the pool is started or not.
110 private started
: boolean
112 * Whether the pool is starting or not.
114 private starting
: boolean
116 * Whether the pool is destroying or not.
118 private destroying
: boolean
120 * Whether the pool ready event has been emitted or not.
122 private readyEventEmitted
: boolean
124 * The start timestamp of the pool.
126 private readonly startTimestamp
129 * Constructs a new poolifier pool.
131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
137 protected readonly minimumNumberOfWorkers
: number,
138 protected readonly filePath
: string,
139 protected readonly opts
: PoolOptions
<Worker
>,
140 protected readonly maximumNumberOfWorkers
?: number
142 if (!this.isMain()) {
144 'Cannot start a pool from a worker with the same type as the pool'
148 checkFilePath(this.filePath
)
149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
150 this.checkPoolOptions(this.opts
)
152 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
153 this.executeTask
= this.executeTask
.bind(this)
154 this.enqueueTask
= this.enqueueTask
.bind(this)
156 if (this.opts
.enableEvents
=== true) {
157 this.initializeEventEmitter()
159 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
165 this.opts
.workerChoiceStrategy
,
166 this.opts
.workerChoiceStrategyOptions
171 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
174 this.starting
= false
175 this.destroying
= false
176 this.readyEventEmitted
= false
177 if (this.opts
.startWorkers
=== true) {
181 this.startTimestamp
= performance
.now()
184 private checkPoolType (): void {
185 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
192 private checkMinimumNumberOfWorkers (
193 minimumNumberOfWorkers
: number | undefined
195 if (minimumNumberOfWorkers
== null) {
197 'Cannot instantiate a pool without specifying the number of workers'
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
201 'Cannot instantiate a pool with a non safe integer number of workers'
203 } else if (minimumNumberOfWorkers
< 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
207 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
212 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
213 if (isPlainObject(opts
)) {
214 this.opts
.startWorkers
= opts
.startWorkers
?? true
215 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
216 this.opts
.workerChoiceStrategy
=
217 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategyOptions(
219 opts
.workerChoiceStrategyOptions
221 if (opts
.workerChoiceStrategyOptions
!= null) {
222 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
224 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
225 this.opts
.enableEvents
= opts
.enableEvents
?? true
226 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
227 if (this.opts
.enableTasksQueue
) {
228 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
229 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
230 opts
.tasksQueueOptions
234 throw new TypeError('Invalid pool options: must be a plain object')
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
242 workerChoiceStrategyOptions
!= null &&
243 !isPlainObject(workerChoiceStrategyOptions
)
246 'Invalid worker choice strategy options: must be a plain object'
250 workerChoiceStrategyOptions
?.weights
!= null &&
251 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
252 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
255 'Invalid worker choice strategy options: must have a weight for each worker node'
259 workerChoiceStrategyOptions
?.measurement
!= null &&
260 !Object.values(Measurements
).includes(
261 workerChoiceStrategyOptions
.measurement
265 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
270 private initializeEventEmitter (): void {
271 this.emitter
= new EventEmitterAsyncResource({
272 name
: `poolifier:${this.type}-${this.worker}-pool`
277 public get
info (): PoolInfo
{
282 started
: this.started
,
284 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
285 strategy
: this.opts
.workerChoiceStrategy
!,
286 strategyRetries
: this.workerChoiceStrategyContext
?.retriesCount
?? 0,
287 minSize
: this.minimumNumberOfWorkers
,
288 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
289 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
290 .runTime
.aggregate
=== true &&
291 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
292 .waitTime
.aggregate
&& {
293 utilization
: round(this.utilization
)
295 workerNodes
: this.workerNodes
.length
,
296 idleWorkerNodes
: this.workerNodes
.reduce(
297 (accumulator
, workerNode
) =>
298 workerNode
.usage
.tasks
.executing
=== 0
303 ...(this.opts
.enableTasksQueue
=== true && {
304 stealingWorkerNodes
: this.workerNodes
.reduce(
305 (accumulator
, workerNode
) =>
306 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
310 busyWorkerNodes
: this.workerNodes
.reduce(
311 (accumulator
, _workerNode
, workerNodeKey
) =>
312 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
315 executedTasks
: this.workerNodes
.reduce(
316 (accumulator
, workerNode
) =>
317 accumulator
+ workerNode
.usage
.tasks
.executed
,
320 executingTasks
: this.workerNodes
.reduce(
321 (accumulator
, workerNode
) =>
322 accumulator
+ workerNode
.usage
.tasks
.executing
,
325 ...(this.opts
.enableTasksQueue
=== true && {
326 queuedTasks
: this.workerNodes
.reduce(
327 (accumulator
, workerNode
) =>
328 accumulator
+ workerNode
.usage
.tasks
.queued
,
332 ...(this.opts
.enableTasksQueue
=== true && {
333 maxQueuedTasks
: this.workerNodes
.reduce(
334 (accumulator
, workerNode
) =>
335 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
339 ...(this.opts
.enableTasksQueue
=== true && {
340 backPressure
: this.hasBackPressure()
342 ...(this.opts
.enableTasksQueue
=== true && {
343 stolenTasks
: this.workerNodes
.reduce(
344 (accumulator
, workerNode
) =>
345 accumulator
+ workerNode
.usage
.tasks
.stolen
,
349 failedTasks
: this.workerNodes
.reduce(
350 (accumulator
, workerNode
) =>
351 accumulator
+ workerNode
.usage
.tasks
.failed
,
354 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
355 .runTime
.aggregate
=== true && {
359 ...this.workerNodes
.map(
360 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
366 ...this.workerNodes
.map(
367 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
371 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
372 .runTime
.average
&& {
375 this.workerNodes
.reduce
<number[]>(
376 (accumulator
, workerNode
) =>
377 accumulator
.concat(workerNode
.usage
.runTime
.history
),
383 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
387 this.workerNodes
.reduce
<number[]>(
388 (accumulator
, workerNode
) =>
389 accumulator
.concat(workerNode
.usage
.runTime
.history
),
397 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
398 .waitTime
.aggregate
=== true && {
402 ...this.workerNodes
.map(
403 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
409 ...this.workerNodes
.map(
410 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
414 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
415 .waitTime
.average
&& {
418 this.workerNodes
.reduce
<number[]>(
419 (accumulator
, workerNode
) =>
420 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
426 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
427 .waitTime
.median
&& {
430 this.workerNodes
.reduce
<number[]>(
431 (accumulator
, workerNode
) =>
432 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
444 * The pool readiness boolean status.
446 private get
ready (): boolean {
451 this.workerNodes
.reduce(
452 (accumulator
, workerNode
) =>
453 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
457 ) >= this.minimumNumberOfWorkers
462 * The pool emptiness boolean status.
464 protected get
empty (): boolean {
465 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
469 * The approximate pool utilization.
471 * @returns The pool utilization.
473 private get
utilization (): number {
474 const poolTimeCapacity
=
475 (performance
.now() - this.startTimestamp
) *
476 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
477 const totalTasksRunTime
= this.workerNodes
.reduce(
478 (accumulator
, workerNode
) =>
479 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
482 const totalTasksWaitTime
= this.workerNodes
.reduce(
483 (accumulator
, workerNode
) =>
484 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
487 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
493 * If it is `'dynamic'`, it provides the `max` property.
495 protected abstract get
type (): PoolType
500 protected abstract get
worker (): WorkerType
503 * Checks if the worker id sent in the received message from a worker is valid.
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
509 if (message
.workerId
== null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
513 `Worker message received from unknown worker '${message.workerId}'`
519 * Gets the worker node key given its worker id.
521 * @param workerId - The worker id.
522 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
524 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
525 return this.workerNodes
.findIndex(
526 workerNode
=> workerNode
.info
.id
=== workerId
531 public setWorkerChoiceStrategy (
532 workerChoiceStrategy
: WorkerChoiceStrategy
,
533 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
535 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
536 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
537 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
538 this.opts
.workerChoiceStrategy
540 if (workerChoiceStrategyOptions
!= null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
543 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
544 workerNode
.resetUsage()
545 this.sendStatisticsMessageToWorker(workerNodeKey
)
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
554 if (workerChoiceStrategyOptions
!= null) {
555 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
557 this.workerChoiceStrategyContext
?.setOptions(
558 this.opts
.workerChoiceStrategyOptions
563 public enableTasksQueue (
565 tasksQueueOptions
?: TasksQueueOptions
567 if (this.opts
.enableTasksQueue
=== true && !enable
) {
568 this.unsetTaskStealing()
569 this.unsetTasksStealingOnBackPressure()
570 this.flushTasksQueues()
572 this.opts
.enableTasksQueue
= enable
573 this.setTasksQueueOptions(tasksQueueOptions
)
577 public setTasksQueueOptions (
578 tasksQueueOptions
: TasksQueueOptions
| undefined
580 if (this.opts
.enableTasksQueue
=== true) {
581 checkValidTasksQueueOptions(tasksQueueOptions
)
582 this.opts
.tasksQueueOptions
=
583 this.buildTasksQueueOptions(tasksQueueOptions
)
584 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
585 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
586 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
587 this.unsetTaskStealing()
588 this.setTaskStealing()
590 this.unsetTaskStealing()
592 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
593 this.unsetTasksStealingOnBackPressure()
594 this.setTasksStealingOnBackPressure()
596 this.unsetTasksStealingOnBackPressure()
598 } else if (this.opts
.tasksQueueOptions
!= null) {
599 delete this.opts
.tasksQueueOptions
603 private buildTasksQueueOptions (
604 tasksQueueOptions
: TasksQueueOptions
| undefined
605 ): TasksQueueOptions
{
607 ...getDefaultTasksQueueOptions(
608 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
614 private setTasksQueueSize (size
: number): void {
615 for (const workerNode
of this.workerNodes
) {
616 workerNode
.tasksQueueBackPressureSize
= size
620 private setTaskStealing (): void {
621 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
622 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
626 private unsetTaskStealing (): void {
627 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
628 this.workerNodes
[workerNodeKey
].off(
630 this.handleWorkerNodeIdleEvent
635 private setTasksStealingOnBackPressure (): void {
636 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
637 this.workerNodes
[workerNodeKey
].on(
639 this.handleWorkerNodeBackPressureEvent
644 private unsetTasksStealingOnBackPressure (): void {
645 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
646 this.workerNodes
[workerNodeKey
].off(
648 this.handleWorkerNodeBackPressureEvent
654 * Whether the pool is full or not.
656 * The pool filling boolean status.
658 protected get
full (): boolean {
660 this.workerNodes
.length
>=
661 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
666 * Whether the pool is busy or not.
668 * The pool busyness boolean status.
670 protected abstract get
busy (): boolean
673 * Whether worker nodes are executing concurrently their tasks quota or not.
675 * @returns Worker nodes busyness boolean status.
677 protected internalBusy (): boolean {
678 if (this.opts
.enableTasksQueue
=== true) {
680 this.workerNodes
.findIndex(
682 workerNode
.info
.ready
&&
683 workerNode
.usage
.tasks
.executing
<
684 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
685 this.opts
.tasksQueueOptions
!.concurrency
!
690 this.workerNodes
.findIndex(
692 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
697 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
698 if (this.opts
.enableTasksQueue
=== true) {
700 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
701 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
702 this.opts
.tasksQueueOptions
!.concurrency
!
705 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
708 private async sendTaskFunctionOperationToWorker (
709 workerNodeKey
: number,
710 message
: MessageValue
<Data
>
711 ): Promise
<boolean> {
712 return await new Promise
<boolean>((resolve
, reject
) => {
713 const taskFunctionOperationListener
= (
714 message
: MessageValue
<Response
>
716 this.checkMessageWorkerId(message
)
717 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
719 message
.taskFunctionOperationStatus
!= null &&
720 message
.workerId
=== workerId
722 if (message
.taskFunctionOperationStatus
) {
727 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
731 this.deregisterWorkerMessageListener(
732 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
733 taskFunctionOperationListener
737 this.registerWorkerMessageListener(
739 taskFunctionOperationListener
741 this.sendToWorker(workerNodeKey
, message
)
745 private async sendTaskFunctionOperationToWorkers (
746 message
: MessageValue
<Data
>
747 ): Promise
<boolean> {
748 return await new Promise
<boolean>((resolve
, reject
) => {
749 const responsesReceived
= new Array<MessageValue
<Response
>>()
750 const taskFunctionOperationsListener
= (
751 message
: MessageValue
<Response
>
753 this.checkMessageWorkerId(message
)
754 if (message
.taskFunctionOperationStatus
!= null) {
755 responsesReceived
.push(message
)
756 if (responsesReceived
.length
=== this.workerNodes
.length
) {
758 responsesReceived
.every(
759 message
=> message
.taskFunctionOperationStatus
=== true
764 responsesReceived
.some(
765 message
=> message
.taskFunctionOperationStatus
=== false
768 const errorResponse
= responsesReceived
.find(
769 response
=> response
.taskFunctionOperationStatus
=== false
773 `Task function operation '${
774 message.taskFunctionOperation as string
775 }' failed on worker ${errorResponse?.workerId} with error: '${
776 errorResponse?.workerError?.message
781 this.deregisterWorkerMessageListener(
782 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
783 taskFunctionOperationsListener
788 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
789 this.registerWorkerMessageListener(
791 taskFunctionOperationsListener
793 this.sendToWorker(workerNodeKey
, message
)
799 public hasTaskFunction (name
: string): boolean {
800 for (const workerNode
of this.workerNodes
) {
802 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
803 workerNode
.info
.taskFunctionNames
.includes(name
)
812 public async addTaskFunction (
814 fn
: TaskFunction
<Data
, Response
>
815 ): Promise
<boolean> {
816 if (typeof name
!== 'string') {
817 throw new TypeError('name argument must be a string')
819 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
820 throw new TypeError('name argument must not be an empty string')
822 if (typeof fn
!== 'function') {
823 throw new TypeError('fn argument must be a function')
825 const opResult
= await this.sendTaskFunctionOperationToWorkers({
826 taskFunctionOperation
: 'add',
827 taskFunctionName
: name
,
828 taskFunction
: fn
.toString()
830 this.taskFunctions
.set(name
, fn
)
835 public async removeTaskFunction (name
: string): Promise
<boolean> {
836 if (!this.taskFunctions
.has(name
)) {
838 'Cannot remove a task function not handled on the pool side'
841 const opResult
= await this.sendTaskFunctionOperationToWorkers({
842 taskFunctionOperation
: 'remove',
843 taskFunctionName
: name
845 this.deleteTaskFunctionWorkerUsages(name
)
846 this.taskFunctions
.delete(name
)
851 public listTaskFunctionNames (): string[] {
852 for (const workerNode
of this.workerNodes
) {
854 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
855 workerNode
.info
.taskFunctionNames
.length
> 0
857 return workerNode
.info
.taskFunctionNames
864 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
865 return await this.sendTaskFunctionOperationToWorkers({
866 taskFunctionOperation
: 'default',
867 taskFunctionName
: name
871 private deleteTaskFunctionWorkerUsages (name
: string): void {
872 for (const workerNode
of this.workerNodes
) {
873 workerNode
.deleteTaskFunctionWorkerUsage(name
)
877 private shallExecuteTask (workerNodeKey
: number): boolean {
879 this.tasksQueueSize(workerNodeKey
) === 0 &&
880 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
881 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
882 this.opts
.tasksQueueOptions
!.concurrency
!
887 public async execute (
890 transferList
?: TransferListItem
[]
891 ): Promise
<Response
> {
892 return await new Promise
<Response
>((resolve
, reject
) => {
894 reject(new Error('Cannot execute a task on not started pool'))
897 if (this.destroying
) {
898 reject(new Error('Cannot execute a task on destroying pool'))
901 if (name
!= null && typeof name
!== 'string') {
902 reject(new TypeError('name argument must be a string'))
907 typeof name
=== 'string' &&
908 name
.trim().length
=== 0
910 reject(new TypeError('name argument must not be an empty string'))
913 if (transferList
!= null && !Array.isArray(transferList
)) {
914 reject(new TypeError('transferList argument must be an array'))
917 const timestamp
= performance
.now()
918 const workerNodeKey
= this.chooseWorkerNode()
919 const task
: Task
<Data
> = {
920 name
: name
?? DEFAULT_TASK_NAME
,
921 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
922 data
: data
?? ({} as Data
),
927 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
928 this.promiseResponseMap
.set(task
.taskId
!, {
932 ...(this.emitter
!= null && {
933 asyncResource
: new AsyncResource('poolifier:task', {
934 triggerAsyncId
: this.emitter
.asyncId
,
935 requireManualDestroy
: true
940 this.opts
.enableTasksQueue
=== false ||
941 (this.opts
.enableTasksQueue
=== true &&
942 this.shallExecuteTask(workerNodeKey
))
944 this.executeTask(workerNodeKey
, task
)
946 this.enqueueTask(workerNodeKey
, task
)
952 public start (): void {
954 throw new Error('Cannot start an already started pool')
957 throw new Error('Cannot start an already starting pool')
959 if (this.destroying
) {
960 throw new Error('Cannot start a destroying pool')
964 this.workerNodes
.reduce(
965 (accumulator
, workerNode
) =>
966 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
968 ) < this.minimumNumberOfWorkers
970 this.createAndSetupWorkerNode()
972 this.starting
= false
977 public async destroy (): Promise
<void> {
979 throw new Error('Cannot destroy an already destroyed pool')
982 throw new Error('Cannot destroy an starting pool')
984 if (this.destroying
) {
985 throw new Error('Cannot destroy an already destroying pool')
987 this.destroying
= true
989 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
990 await this.destroyWorkerNode(workerNodeKey
)
993 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
994 this.emitter
?.emitDestroy()
995 this.emitter
?.removeAllListeners()
996 this.readyEventEmitted
= false
997 this.destroying
= false
1001 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1002 await new Promise
<void>((resolve
, reject
) => {
1003 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1004 if (this.workerNodes
[workerNodeKey
] == null) {
1008 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1009 this.checkMessageWorkerId(message
)
1010 if (message
.kill
=== 'success') {
1012 } else if (message
.kill
=== 'failure') {
1015 `Kill message handling failed on worker ${message.workerId}`
1020 // FIXME: should be registered only once
1021 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1022 this.sendToWorker(workerNodeKey
, { kill
: true })
1027 * Terminates the worker node given its worker node key.
1029 * @param workerNodeKey - The worker node key.
1031 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1032 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1033 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1034 const workerNode
= this.workerNodes
[workerNodeKey
]
1035 await waitWorkerNodeEvents(
1039 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1040 getDefaultTasksQueueOptions(
1041 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1042 ).tasksFinishedTimeout
1044 await this.sendKillMessageToWorker(workerNodeKey
)
1045 await workerNode
.terminate()
1049 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1050 * Can be overridden.
1054 protected setupHook (): void {
1055 /* Intentionally empty */
1059 * Should return whether the worker is the main worker or not.
1061 protected abstract isMain (): boolean
1064 * Hook executed before the worker task execution.
1065 * Can be overridden.
1067 * @param workerNodeKey - The worker node key.
1068 * @param task - The task to execute.
1070 protected beforeTaskExecutionHook (
1071 workerNodeKey
: number,
1074 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1075 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1076 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1077 ++workerUsage
.tasks
.executing
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext
,
1085 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1086 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1087 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1090 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1091 const taskFunctionWorkerUsage
= this.workerNodes
[
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 ].getTaskFunctionWorkerUsage(task
.name
!)!
1095 ++taskFunctionWorkerUsage
.tasks
.executing
1096 updateWaitTimeWorkerUsage(
1097 this.workerChoiceStrategyContext
,
1098 taskFunctionWorkerUsage
,
1105 * Hook executed after the worker task execution.
1106 * Can be overridden.
1108 * @param workerNodeKey - The worker node key.
1109 * @param message - The received message.
1111 protected afterTaskExecutionHook (
1112 workerNodeKey
: number,
1113 message
: MessageValue
<Response
>
1115 let needWorkerChoiceStrategyUpdate
= false
1116 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1117 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1118 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1119 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1120 updateRunTimeWorkerUsage(
1121 this.workerChoiceStrategyContext
,
1125 updateEluWorkerUsage(
1126 this.workerChoiceStrategyContext
,
1130 needWorkerChoiceStrategyUpdate
= true
1133 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1134 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1135 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1136 message
.taskPerformance
!.name
1139 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1140 const taskFunctionWorkerUsage
= this.workerNodes
[
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1144 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1145 updateRunTimeWorkerUsage(
1146 this.workerChoiceStrategyContext
,
1147 taskFunctionWorkerUsage
,
1150 updateEluWorkerUsage(
1151 this.workerChoiceStrategyContext
,
1152 taskFunctionWorkerUsage
,
1155 needWorkerChoiceStrategyUpdate
= true
1157 if (needWorkerChoiceStrategyUpdate
) {
1158 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1163 * Whether the worker node shall update its task function worker usage or not.
1165 * @param workerNodeKey - The worker node key.
1166 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1168 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1169 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1171 workerInfo
!= null &&
1172 Array.isArray(workerInfo
.taskFunctionNames
) &&
1173 workerInfo
.taskFunctionNames
.length
> 2
1178 * Chooses a worker node for the next task.
1180 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1182 * @returns The chosen worker node key
1184 private chooseWorkerNode (): number {
1185 if (this.shallCreateDynamicWorker()) {
1186 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1188 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1189 .dynamicWorkerUsage
=== true
1191 return workerNodeKey
1194 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1195 return this.workerChoiceStrategyContext
!.execute()
1199 * Conditions for dynamic worker creation.
1201 * @returns Whether to create a dynamic worker or not.
1203 protected abstract shallCreateDynamicWorker (): boolean
1206 * Sends a message to worker given its worker node key.
1208 * @param workerNodeKey - The worker node key.
1209 * @param message - The message.
1210 * @param transferList - The optional array of transferable objects.
1212 protected abstract sendToWorker (
1213 workerNodeKey
: number,
1214 message
: MessageValue
<Data
>,
1215 transferList
?: TransferListItem
[]
1219 * Creates a new, completely set up worker node.
1221 * @returns New, completely set up worker node key.
1223 protected createAndSetupWorkerNode (): number {
1224 const workerNode
= this.createWorkerNode()
1225 workerNode
.registerWorkerEventHandler(
1227 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1229 workerNode
.registerWorkerEventHandler(
1231 this.opts
.messageHandler
?? EMPTY_FUNCTION
1233 workerNode
.registerWorkerEventHandler(
1235 this.opts
.errorHandler
?? EMPTY_FUNCTION
1237 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1238 workerNode
.info
.ready
= false
1239 this.emitter
?.emit(PoolEvents
.error
, error
)
1243 this.opts
.restartWorkerOnError
=== true
1245 if (workerNode
.info
.dynamic
) {
1246 this.createAndSetupDynamicWorkerNode()
1248 this.createAndSetupWorkerNode()
1254 this.opts
.enableTasksQueue
=== true
1256 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1258 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1259 workerNode
?.terminate().catch(error
=> {
1260 this.emitter
?.emit(PoolEvents
.error
, error
)
1263 workerNode
.registerWorkerEventHandler(
1265 this.opts
.exitHandler
?? EMPTY_FUNCTION
1267 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1268 this.removeWorkerNode(workerNode
)
1270 const workerNodeKey
= this.addWorkerNode(workerNode
)
1271 this.afterWorkerNodeSetup(workerNodeKey
)
1272 return workerNodeKey
1276 * Creates a new, completely set up dynamic worker node.
1278 * @returns New, completely set up dynamic worker node key.
1280 protected createAndSetupDynamicWorkerNode (): number {
1281 const workerNodeKey
= this.createAndSetupWorkerNode()
1282 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1283 this.checkMessageWorkerId(message
)
1284 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1287 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1288 // Kill message received from worker
1290 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1291 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1292 ((this.opts
.enableTasksQueue
=== false &&
1293 workerUsage
.tasks
.executing
=== 0) ||
1294 (this.opts
.enableTasksQueue
=== true &&
1295 workerUsage
.tasks
.executing
=== 0 &&
1296 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1298 // Flag the worker node as not ready immediately
1299 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1300 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1301 this.emitter
?.emit(PoolEvents
.error
, error
)
1305 this.sendToWorker(workerNodeKey
, {
1308 if (this.taskFunctions
.size
> 0) {
1309 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1310 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1311 taskFunctionOperation
: 'add',
1313 taskFunction
: taskFunction
.toString()
1315 this.emitter
?.emit(PoolEvents
.error
, error
)
1319 const workerNode
= this.workerNodes
[workerNodeKey
]
1320 workerNode
.info
.dynamic
= true
1322 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1323 .dynamicWorkerReady
=== true ||
1324 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1325 .dynamicWorkerUsage
=== true
1327 workerNode
.info
.ready
= true
1329 this.checkAndEmitDynamicWorkerCreationEvents()
1330 return workerNodeKey
1334 * Registers a listener callback on the worker given its worker node key.
1336 * @param workerNodeKey - The worker node key.
1337 * @param listener - The message listener callback.
1339 protected abstract registerWorkerMessageListener
<
1340 Message
extends Data
| Response
1342 workerNodeKey
: number,
1343 listener
: (message
: MessageValue
<Message
>) => void
1347 * Registers once a listener callback on the worker given its worker node key.
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1352 protected abstract registerOnceWorkerMessageListener
<
1353 Message
extends Data
| Response
1355 workerNodeKey
: number,
1356 listener
: (message
: MessageValue
<Message
>) => void
1360 * Deregisters a listener callback on the worker given its worker node key.
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1365 protected abstract deregisterWorkerMessageListener
<
1366 Message
extends Data
| Response
1368 workerNodeKey
: number,
1369 listener
: (message
: MessageValue
<Message
>) => void
1373 * Method hooked up after a worker node has been newly created.
1374 * Can be overridden.
1376 * @param workerNodeKey - The newly created worker node key.
1378 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1379 // Listen to worker messages.
1380 this.registerWorkerMessageListener(
1382 this.workerMessageListener
1384 // Send the startup message to worker.
1385 this.sendStartupMessageToWorker(workerNodeKey
)
1386 // Send the statistics message to worker.
1387 this.sendStatisticsMessageToWorker(workerNodeKey
)
1388 if (this.opts
.enableTasksQueue
=== true) {
1389 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1390 this.workerNodes
[workerNodeKey
].on(
1392 this.handleWorkerNodeIdleEvent
1395 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1396 this.workerNodes
[workerNodeKey
].on(
1398 this.handleWorkerNodeBackPressureEvent
1405 * Sends the startup message to worker given its worker node key.
1407 * @param workerNodeKey - The worker node key.
1409 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1412 * Sends the statistics message to worker given its worker node key.
1414 * @param workerNodeKey - The worker node key.
1416 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1417 this.sendToWorker(workerNodeKey
, {
1420 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1421 .runTime
.aggregate
?? false,
1423 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
1429 private cannotStealTask (): boolean {
1430 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1433 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1434 if (this.shallExecuteTask(workerNodeKey
)) {
1435 this.executeTask(workerNodeKey
, task
)
1437 this.enqueueTask(workerNodeKey
, task
)
1441 private redistributeQueuedTasks (workerNodeKey
: number): void {
1442 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1445 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1446 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1447 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1448 return workerNode
.info
.ready
&&
1449 workerNode
.usage
.tasks
.queued
<
1450 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1457 destinationWorkerNodeKey
,
1458 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1459 this.dequeueTask(workerNodeKey
)!
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey
: number,
1468 const workerNode
= this.workerNodes
[workerNodeKey
]
1469 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1470 if (workerNode
?.usage
!= null) {
1471 ++workerNode
.usage
.tasks
.stolen
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1475 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1477 const taskFunctionWorkerUsage
=
1478 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1479 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1480 ++taskFunctionWorkerUsage
.tasks
.stolen
1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1485 workerNodeKey
: number
1487 const workerNode
= this.workerNodes
[workerNodeKey
]
1488 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1489 if (workerNode
?.usage
!= null) {
1490 ++workerNode
.usage
.tasks
.sequentiallyStolen
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey
: number,
1498 const workerNode
= this.workerNodes
[workerNodeKey
]
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1501 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1503 const taskFunctionWorkerUsage
=
1504 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1505 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1506 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1511 workerNodeKey
: number
1513 const workerNode
= this.workerNodes
[workerNodeKey
]
1514 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1515 if (workerNode
?.usage
!= null) {
1516 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1520 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1521 workerNodeKey
: number,
1524 const workerNode
= this.workerNodes
[workerNodeKey
]
1526 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1527 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1529 const taskFunctionWorkerUsage
=
1530 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1531 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1532 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1536 private readonly handleWorkerNodeIdleEvent
= (
1537 eventDetail
: WorkerNodeEventDetail
,
1538 previousStolenTask
?: Task
<Data
>
1540 const { workerNodeKey
} = eventDetail
1541 if (workerNodeKey
== null) {
1543 "WorkerNode event detail 'workerNodeKey' property must be defined"
1546 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1548 this.cannotStealTask() ||
1549 (this.info
.stealingWorkerNodes
?? 0) >
1550 Math.floor(this.workerNodes
.length
/ 2)
1552 if (workerInfo
!= null && previousStolenTask
!= null) {
1553 workerInfo
.stealing
= false
1557 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1559 workerInfo
!= null &&
1560 previousStolenTask
!= null &&
1561 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1562 (workerNodeTasksUsage
.executing
> 0 ||
1563 this.tasksQueueSize(workerNodeKey
) > 0)
1565 workerInfo
.stealing
= false
1566 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1567 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1568 .taskFunctionNames
!) {
1569 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1574 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1577 if (workerInfo
== null) {
1579 `Worker node with key '${workerNodeKey}' not found in pool`
1582 workerInfo
.stealing
= true
1583 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1585 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1589 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1594 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1595 (previousStolenTask
!= null &&
1596 previousStolenTask
.name
=== stolenTask
.name
&&
1597 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1599 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1601 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1605 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1607 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1612 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1614 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1618 this.emitter
?.emit(PoolEvents
.error
, error
)
1622 private readonly workerNodeStealTask
= (
1623 workerNodeKey
: number
1624 ): Task
<Data
> | undefined => {
1625 const workerNodes
= this.workerNodes
1628 (workerNodeA
, workerNodeB
) =>
1629 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1631 const sourceWorkerNode
= workerNodes
.find(
1632 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1633 sourceWorkerNode
.info
.ready
&&
1634 !sourceWorkerNode
.info
.stealing
&&
1635 sourceWorkerNodeKey
!== workerNodeKey
&&
1636 sourceWorkerNode
.usage
.tasks
.queued
> 0
1638 if (sourceWorkerNode
!= null) {
1639 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1640 const task
= sourceWorkerNode
.popTask()!
1641 this.handleTask(workerNodeKey
, task
)
1642 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1643 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1644 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1649 private readonly handleWorkerNodeBackPressureEvent
= (
1650 eventDetail
: WorkerNodeEventDetail
1653 this.cannotStealTask() ||
1654 (this.info
.stealingWorkerNodes
?? 0) >
1655 Math.floor(this.workerNodes
.length
/ 2)
1659 const { workerId
} = eventDetail
1660 const sizeOffset
= 1
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1665 const sourceWorkerNode
=
1666 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1667 const workerNodes
= this.workerNodes
1670 (workerNodeA
, workerNodeB
) =>
1671 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1673 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1675 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1676 workerNode
.info
.ready
&&
1677 !workerNode
.info
.stealing
&&
1678 workerNode
.info
.id
!== workerId
&&
1679 workerNode
.usage
.tasks
.queued
<
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1683 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1684 if (workerInfo
== null) {
1686 `Worker node with key '${workerNodeKey}' not found in pool`
1689 workerInfo
.stealing
= true
1690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1691 const task
= sourceWorkerNode
.popTask()!
1692 this.handleTask(workerNodeKey
, task
)
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1695 workerInfo
.stealing
= false
1701 * This method is the message listener registered on each worker.
1703 protected readonly workerMessageListener
= (
1704 message
: MessageValue
<Response
>
1706 this.checkMessageWorkerId(message
)
1707 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1708 if (ready
!= null && taskFunctionNames
!= null) {
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message
)
1711 } else if (taskId
!= null) {
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message
)
1714 } else if (taskFunctionNames
!= null) {
1715 // Task function names message received from worker
1716 const workerInfo
= this.getWorkerInfo(
1717 this.getWorkerNodeKeyByWorkerId(workerId
)
1719 if (workerInfo
!= null) {
1720 workerInfo
.taskFunctionNames
= taskFunctionNames
1725 private checkAndEmitReadyEvent (): void {
1726 if (!this.readyEventEmitted
&& this.ready
) {
1727 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1728 this.readyEventEmitted
= true
1732 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1733 const { workerId
, ready
, taskFunctionNames
} = message
1734 if (ready
== null || !ready
) {
1735 throw new Error(`Worker ${workerId} failed to initialize`)
1738 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1739 workerNode
.info
.ready
= ready
1740 workerNode
.info
.taskFunctionNames
= taskFunctionNames
1741 this.checkAndEmitReadyEvent()
1744 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1745 const { workerId
, taskId
, workerError
, data
} = message
1746 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1747 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1748 if (promiseResponse
!= null) {
1749 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1750 const workerNode
= this.workerNodes
[workerNodeKey
]
1751 if (workerError
!= null) {
1752 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1753 asyncResource
!= null
1754 ? asyncResource
.runInAsyncScope(
1759 : reject(workerError
.message
)
1761 asyncResource
!= null
1762 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1763 : resolve(data
as Response
)
1765 asyncResource
?.emitDestroy()
1766 this.afterTaskExecutionHook(workerNodeKey
, message
)
1767 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1768 this.promiseResponseMap
.delete(taskId
!)
1769 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1770 workerNode
?.emit('taskFinished', taskId
)
1771 if (this.opts
.enableTasksQueue
=== true && !this.destroying
) {
1772 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1774 this.tasksQueueSize(workerNodeKey
) > 0 &&
1775 workerNodeTasksUsage
.executing
<
1776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1777 this.opts
.tasksQueueOptions
!.concurrency
!
1779 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1780 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1783 workerNodeTasksUsage
.executing
=== 0 &&
1784 this.tasksQueueSize(workerNodeKey
) === 0 &&
1785 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1787 workerNode
.emit('idle', {
1796 private checkAndEmitTaskExecutionEvents (): void {
1798 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1802 private checkAndEmitTaskQueuingEvents (): void {
1803 if (this.hasBackPressure()) {
1804 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1809 * Emits dynamic worker creation events.
1811 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1814 * Gets the worker information given its worker node key.
1816 * @param workerNodeKey - The worker node key.
1817 * @returns The worker information.
1819 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1820 return this.workerNodes
[workerNodeKey
]?.info
1824 * Creates a worker node.
1826 * @returns The created worker node.
1828 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1829 const workerNode
= new WorkerNode
<Worker
, Data
>(
1834 workerOptions
: this.opts
.workerOptions
,
1835 tasksQueueBackPressureSize
:
1836 this.opts
.tasksQueueOptions
?.size
??
1837 getDefaultTasksQueueOptions(
1838 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1842 // Flag the worker node as ready at pool startup.
1843 if (this.starting
) {
1844 workerNode
.info
.ready
= true
1850 * Adds the given worker node in the pool worker nodes.
1852 * @param workerNode - The worker node.
1853 * @returns The added worker node key.
1854 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1856 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1857 this.workerNodes
.push(workerNode
)
1858 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1859 if (workerNodeKey
=== -1) {
1860 throw new Error('Worker added not found in worker nodes')
1862 return workerNodeKey
1865 private checkAndEmitEmptyEvent (): void {
1867 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1868 this.readyEventEmitted
= false
1873 * Removes the worker node from the pool worker nodes.
1875 * @param workerNode - The worker node.
1877 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1878 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1879 if (workerNodeKey
!== -1) {
1880 this.workerNodes
.splice(workerNodeKey
, 1)
1881 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1883 this.checkAndEmitEmptyEvent()
1886 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1887 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1888 if (workerInfo
!= null) {
1889 workerInfo
.ready
= false
1893 private hasBackPressure (): boolean {
1895 this.opts
.enableTasksQueue
=== true &&
1896 this.workerNodes
.findIndex(
1897 workerNode
=> !workerNode
.hasBackPressure()
1903 * Executes the given task on the worker given its worker node key.
1905 * @param workerNodeKey - The worker node key.
1906 * @param task - The task to execute.
1908 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1909 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1910 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1911 this.checkAndEmitTaskExecutionEvents()
1914 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1915 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1916 this.checkAndEmitTaskQueuingEvents()
1917 return tasksQueueSize
1920 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1921 return this.workerNodes
[workerNodeKey
].dequeueTask()
1924 private tasksQueueSize (workerNodeKey
: number): number {
1925 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1928 protected flushTasksQueue (workerNodeKey
: number): number {
1929 let flushedTasks
= 0
1930 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1931 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1932 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1935 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1939 private flushTasksQueues (): void {
1940 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1941 this.flushTasksQueue(workerNodeKey
)