1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 TaskFunctionProperties
12 } from
'../utility-types.js'
15 buildTaskFunctionProperties
,
30 } from
'../worker/task-functions.js'
31 import { KillBehaviors
} from
'../worker/worker-options.js'
39 type TasksQueueOptions
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types.js'
47 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
50 checkValidTasksQueueOptions
,
51 checkValidWorkerChoiceStrategy
,
52 getDefaultTasksQueueOptions
,
54 updateRunTimeWorkerUsage
,
55 updateTaskStatisticsWorkerUsage
,
56 updateWaitTimeWorkerUsage
,
59 import { version
} from
'./version.js'
64 WorkerNodeEventDetail
,
67 import { WorkerNode
} from
'./worker-node.js'
70 * Base class that implements some shared logic for all poolifier pools.
72 * @typeParam Worker - Type of worker which manages this pool.
73 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
74 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
76 export abstract class AbstractPool
<
77 Worker
extends IWorker
,
80 > implements IPool
<Worker
, Data
, Response
> {
82 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
85 public emitter
?: EventEmitterAsyncResource
88 * The task execution response promise map:
89 * - `key`: The message id of each submitted task.
90 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
92 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
94 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
95 new Map
<string, PromiseResponseWrapper
<Response
>>()
98 * Worker choice strategy context referencing a worker choice algorithm implementation.
100 protected workerChoiceStrategyContext
?: WorkerChoiceStrategyContext
<
107 * The task functions added at runtime map:
108 * - `key`: The task function name.
109 * - `value`: The task function object.
111 private readonly taskFunctions
: Map
<
113 TaskFunctionObject
<Data
, Response
>
117 * Whether the pool is started or not.
119 private started
: boolean
121 * Whether the pool is starting or not.
123 private starting
: boolean
125 * Whether the pool is destroying or not.
127 private destroying
: boolean
129 * Whether the minimum number of workers is starting or not.
131 private startingMinimumNumberOfWorkers
: boolean
133 * Whether the pool ready event has been emitted or not.
135 private readyEventEmitted
: boolean
137 * The start timestamp of the pool.
139 private readonly startTimestamp
142 * Constructs a new poolifier pool.
144 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
145 * @param filePath - Path to the worker file.
146 * @param opts - Options for the pool.
147 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
150 protected readonly minimumNumberOfWorkers
: number,
151 protected readonly filePath
: string,
152 protected readonly opts
: PoolOptions
<Worker
>,
153 protected readonly maximumNumberOfWorkers
?: number
155 if (!this.isMain()) {
157 'Cannot start a pool from a worker with the same type as the pool'
161 checkFilePath(this.filePath
)
162 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
163 this.checkPoolOptions(this.opts
)
165 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
166 this.executeTask
= this.executeTask
.bind(this)
167 this.enqueueTask
= this.enqueueTask
.bind(this)
169 if (this.opts
.enableEvents
=== true) {
170 this.initializeEventEmitter()
172 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
178 this.opts
.workerChoiceStrategy
,
179 this.opts
.workerChoiceStrategyOptions
184 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
187 this.starting
= false
188 this.destroying
= false
189 this.readyEventEmitted
= false
190 this.startingMinimumNumberOfWorkers
= false
191 if (this.opts
.startWorkers
=== true) {
195 this.startTimestamp
= performance
.now()
198 private checkPoolType (): void {
199 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
201 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
206 private checkMinimumNumberOfWorkers (
207 minimumNumberOfWorkers
: number | undefined
209 if (minimumNumberOfWorkers
== null) {
211 'Cannot instantiate a pool without specifying the number of workers'
213 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
215 'Cannot instantiate a pool with a non safe integer number of workers'
217 } else if (minimumNumberOfWorkers
< 0) {
218 throw new RangeError(
219 'Cannot instantiate a pool with a negative number of workers'
221 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
222 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
226 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
227 if (isPlainObject(opts
)) {
228 this.opts
.startWorkers
= opts
.startWorkers
?? true
229 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
230 this.opts
.workerChoiceStrategy
=
231 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
232 this.checkValidWorkerChoiceStrategyOptions(
233 opts
.workerChoiceStrategyOptions
235 if (opts
.workerChoiceStrategyOptions
!= null) {
236 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
238 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
239 this.opts
.enableEvents
= opts
.enableEvents
?? true
240 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
241 if (this.opts
.enableTasksQueue
) {
242 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
243 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
244 opts
.tasksQueueOptions
248 throw new TypeError('Invalid pool options: must be a plain object')
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
256 workerChoiceStrategyOptions
!= null &&
257 !isPlainObject(workerChoiceStrategyOptions
)
260 'Invalid worker choice strategy options: must be a plain object'
264 workerChoiceStrategyOptions
?.weights
!= null &&
265 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
266 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
269 'Invalid worker choice strategy options: must have a weight for each worker node'
273 workerChoiceStrategyOptions
?.measurement
!= null &&
274 !Object.values(Measurements
).includes(
275 workerChoiceStrategyOptions
.measurement
279 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
284 private initializeEventEmitter (): void {
285 this.emitter
= new EventEmitterAsyncResource({
286 name
: `poolifier:${this.type}-${this.worker}-pool`
291 public get
info (): PoolInfo
{
296 started
: this.started
,
298 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
299 strategy
: this.opts
.workerChoiceStrategy
!,
300 strategyRetries
: this.workerChoiceStrategyContext
?.retriesCount
?? 0,
301 minSize
: this.minimumNumberOfWorkers
,
302 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
303 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
304 .runTime
.aggregate
=== true &&
305 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
306 .waitTime
.aggregate
&& {
307 utilization
: round(this.utilization
)
309 workerNodes
: this.workerNodes
.length
,
310 idleWorkerNodes
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 workerNode
.usage
.tasks
.executing
=== 0
317 ...(this.opts
.enableTasksQueue
=== true && {
318 stealingWorkerNodes
: this.workerNodes
.reduce(
319 (accumulator
, workerNode
) =>
320 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
324 busyWorkerNodes
: this.workerNodes
.reduce(
325 (accumulator
, _workerNode
, workerNodeKey
) =>
326 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
329 executedTasks
: this.workerNodes
.reduce(
330 (accumulator
, workerNode
) =>
331 accumulator
+ workerNode
.usage
.tasks
.executed
,
334 executingTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ workerNode
.usage
.tasks
.executing
,
339 ...(this.opts
.enableTasksQueue
=== true && {
340 queuedTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.queued
,
346 ...(this.opts
.enableTasksQueue
=== true && {
347 maxQueuedTasks
: this.workerNodes
.reduce(
348 (accumulator
, workerNode
) =>
349 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
353 ...(this.opts
.enableTasksQueue
=== true && {
354 backPressure
: this.hasBackPressure()
356 ...(this.opts
.enableTasksQueue
=== true && {
357 stolenTasks
: this.workerNodes
.reduce(
358 (accumulator
, workerNode
) =>
359 accumulator
+ workerNode
.usage
.tasks
.stolen
,
363 failedTasks
: this.workerNodes
.reduce(
364 (accumulator
, workerNode
) =>
365 accumulator
+ workerNode
.usage
.tasks
.failed
,
368 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
369 .runTime
.aggregate
=== true && {
373 ...this.workerNodes
.map(
374 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
380 ...this.workerNodes
.map(
381 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
385 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
386 .runTime
.average
&& {
389 this.workerNodes
.reduce
<number[]>(
390 (accumulator
, workerNode
) =>
391 accumulator
.concat(workerNode
.usage
.runTime
.history
),
397 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
401 this.workerNodes
.reduce
<number[]>(
402 (accumulator
, workerNode
) =>
403 accumulator
.concat(workerNode
.usage
.runTime
.history
),
411 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
412 .waitTime
.aggregate
=== true && {
416 ...this.workerNodes
.map(
417 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
423 ...this.workerNodes
.map(
424 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
428 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
429 .waitTime
.average
&& {
432 this.workerNodes
.reduce
<number[]>(
433 (accumulator
, workerNode
) =>
434 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
440 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
441 .waitTime
.median
&& {
444 this.workerNodes
.reduce
<number[]>(
445 (accumulator
, workerNode
) =>
446 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
458 * The pool readiness boolean status.
460 private get
ready (): boolean {
465 this.workerNodes
.reduce(
466 (accumulator
, workerNode
) =>
467 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
471 ) >= this.minimumNumberOfWorkers
476 * The pool emptiness boolean status.
478 protected get
empty (): boolean {
479 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
483 * The approximate pool utilization.
485 * @returns The pool utilization.
487 private get
utilization (): number {
488 const poolTimeCapacity
=
489 (performance
.now() - this.startTimestamp
) *
490 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
491 const totalTasksRunTime
= this.workerNodes
.reduce(
492 (accumulator
, workerNode
) =>
493 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
496 const totalTasksWaitTime
= this.workerNodes
.reduce(
497 (accumulator
, workerNode
) =>
498 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
501 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
507 * If it is `'dynamic'`, it provides the `max` property.
509 protected abstract get
type (): PoolType
514 protected abstract get
worker (): WorkerType
517 * Checks if the worker id sent in the received message from a worker is valid.
519 * @param message - The received message.
520 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
522 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
523 if (message
.workerId
== null) {
524 throw new Error('Worker message received without worker id')
525 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
527 `Worker message received from unknown worker '${message.workerId}'`
533 * Gets the worker node key given its worker id.
535 * @param workerId - The worker id.
536 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
538 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
539 return this.workerNodes
.findIndex(
540 workerNode
=> workerNode
.info
.id
=== workerId
545 public setWorkerChoiceStrategy (
546 workerChoiceStrategy
: WorkerChoiceStrategy
,
547 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
549 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
550 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
551 this.workerChoiceStrategyContext
?.setWorkerChoiceStrategy(
552 this.opts
.workerChoiceStrategy
554 if (workerChoiceStrategyOptions
!= null) {
555 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
557 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
558 workerNode
.resetUsage()
559 this.sendStatisticsMessageToWorker(workerNodeKey
)
564 public setWorkerChoiceStrategyOptions (
565 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
567 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
568 if (workerChoiceStrategyOptions
!= null) {
569 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
571 this.workerChoiceStrategyContext
?.setOptions(
572 this.opts
.workerChoiceStrategyOptions
577 public enableTasksQueue (
579 tasksQueueOptions
?: TasksQueueOptions
581 if (this.opts
.enableTasksQueue
=== true && !enable
) {
582 this.unsetTaskStealing()
583 this.unsetTasksStealingOnBackPressure()
584 this.flushTasksQueues()
586 this.opts
.enableTasksQueue
= enable
587 this.setTasksQueueOptions(tasksQueueOptions
)
591 public setTasksQueueOptions (
592 tasksQueueOptions
: TasksQueueOptions
| undefined
594 if (this.opts
.enableTasksQueue
=== true) {
595 checkValidTasksQueueOptions(tasksQueueOptions
)
596 this.opts
.tasksQueueOptions
=
597 this.buildTasksQueueOptions(tasksQueueOptions
)
598 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
599 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
600 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
601 this.unsetTaskStealing()
602 this.setTaskStealing()
604 this.unsetTaskStealing()
606 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
607 this.unsetTasksStealingOnBackPressure()
608 this.setTasksStealingOnBackPressure()
610 this.unsetTasksStealingOnBackPressure()
612 } else if (this.opts
.tasksQueueOptions
!= null) {
613 delete this.opts
.tasksQueueOptions
617 private buildTasksQueueOptions (
618 tasksQueueOptions
: TasksQueueOptions
| undefined
619 ): TasksQueueOptions
{
621 ...getDefaultTasksQueueOptions(
622 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
628 private setTasksQueueSize (size
: number): void {
629 for (const workerNode
of this.workerNodes
) {
630 workerNode
.tasksQueueBackPressureSize
= size
634 private setTaskStealing (): void {
635 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
636 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
642 this.workerNodes
[workerNodeKey
].off(
644 this.handleWorkerNodeIdleEvent
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
651 this.workerNodes
[workerNodeKey
].on(
653 this.handleWorkerNodeBackPressureEvent
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
660 this.workerNodes
[workerNodeKey
].off(
662 this.handleWorkerNodeBackPressureEvent
668 * Whether the pool is full or not.
670 * The pool filling boolean status.
672 protected get
full (): boolean {
674 this.workerNodes
.length
>=
675 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
680 * Whether the pool is busy or not.
682 * The pool busyness boolean status.
684 protected abstract get
busy (): boolean
687 * Whether worker nodes are executing concurrently their tasks quota or not.
689 * @returns Worker nodes busyness boolean status.
691 protected internalBusy (): boolean {
692 if (this.opts
.enableTasksQueue
=== true) {
694 this.workerNodes
.findIndex(
696 workerNode
.info
.ready
&&
697 workerNode
.usage
.tasks
.executing
<
698 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
699 this.opts
.tasksQueueOptions
!.concurrency
!
704 this.workerNodes
.findIndex(
706 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
711 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
712 if (this.opts
.enableTasksQueue
=== true) {
714 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
715 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
716 this.opts
.tasksQueueOptions
!.concurrency
!
719 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
722 private async sendTaskFunctionOperationToWorker (
723 workerNodeKey
: number,
724 message
: MessageValue
<Data
>
725 ): Promise
<boolean> {
726 return await new Promise
<boolean>((resolve
, reject
) => {
727 const taskFunctionOperationListener
= (
728 message
: MessageValue
<Response
>
730 this.checkMessageWorkerId(message
)
731 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
733 message
.taskFunctionOperationStatus
!= null &&
734 message
.workerId
=== workerId
736 if (message
.taskFunctionOperationStatus
) {
741 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
745 this.deregisterWorkerMessageListener(
746 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
747 taskFunctionOperationListener
751 this.registerWorkerMessageListener(
753 taskFunctionOperationListener
755 this.sendToWorker(workerNodeKey
, message
)
759 private async sendTaskFunctionOperationToWorkers (
760 message
: MessageValue
<Data
>
761 ): Promise
<boolean> {
762 return await new Promise
<boolean>((resolve
, reject
) => {
763 const responsesReceived
= new Array<MessageValue
<Response
>>()
764 const taskFunctionOperationsListener
= (
765 message
: MessageValue
<Response
>
767 this.checkMessageWorkerId(message
)
768 if (message
.taskFunctionOperationStatus
!= null) {
769 responsesReceived
.push(message
)
770 if (responsesReceived
.length
=== this.workerNodes
.length
) {
772 responsesReceived
.every(
773 message
=> message
.taskFunctionOperationStatus
=== true
778 responsesReceived
.some(
779 message
=> message
.taskFunctionOperationStatus
=== false
782 const errorResponse
= responsesReceived
.find(
783 response
=> response
.taskFunctionOperationStatus
=== false
787 `Task function operation '${
788 message.taskFunctionOperation as string
789 }' failed on worker ${errorResponse?.workerId} with error: '${
790 errorResponse?.workerError?.message
795 this.deregisterWorkerMessageListener(
796 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
797 taskFunctionOperationsListener
802 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
803 this.registerWorkerMessageListener(
805 taskFunctionOperationsListener
807 this.sendToWorker(workerNodeKey
, message
)
813 public hasTaskFunction (name
: string): boolean {
814 for (const workerNode
of this.workerNodes
) {
816 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
817 workerNode
.info
.taskFunctionsProperties
.some(
818 taskFunctionProperties
=> taskFunctionProperties
.name
=== name
828 public async addTaskFunction (
830 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
831 ): Promise
<boolean> {
832 if (typeof name
!== 'string') {
833 throw new TypeError('name argument must be a string')
835 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
836 throw new TypeError('name argument must not be an empty string')
838 if (typeof fn
=== 'function') {
839 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
841 if (typeof fn
.taskFunction
!== 'function') {
842 throw new TypeError('taskFunction property must be a function')
844 const opResult
= await this.sendTaskFunctionOperationToWorkers({
845 taskFunctionOperation
: 'add',
846 taskFunctionProperties
: buildTaskFunctionProperties(name
, fn
),
847 taskFunction
: fn
.taskFunction
.toString()
849 this.taskFunctions
.set(name
, fn
)
854 public async removeTaskFunction (name
: string): Promise
<boolean> {
855 if (!this.taskFunctions
.has(name
)) {
857 'Cannot remove a task function not handled on the pool side'
860 const opResult
= await this.sendTaskFunctionOperationToWorkers({
861 taskFunctionOperation
: 'remove',
862 taskFunctionProperties
: buildTaskFunctionProperties(
864 this.taskFunctions
.get(name
)
867 this.deleteTaskFunctionWorkerUsages(name
)
868 this.taskFunctions
.delete(name
)
873 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
874 for (const workerNode
of this.workerNodes
) {
876 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
877 workerNode
.info
.taskFunctionsProperties
.length
> 0
879 return workerNode
.info
.taskFunctionsProperties
886 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
887 return await this.sendTaskFunctionOperationToWorkers({
888 taskFunctionOperation
: 'default',
889 taskFunctionProperties
: buildTaskFunctionProperties(
891 this.taskFunctions
.get(name
)
896 private deleteTaskFunctionWorkerUsages (name
: string): void {
897 for (const workerNode
of this.workerNodes
) {
898 workerNode
.deleteTaskFunctionWorkerUsage(name
)
902 private shallExecuteTask (workerNodeKey
: number): boolean {
904 this.tasksQueueSize(workerNodeKey
) === 0 &&
905 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
906 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
907 this.opts
.tasksQueueOptions
!.concurrency
!
912 public async execute (
915 transferList
?: readonly TransferListItem
[]
916 ): Promise
<Response
> {
917 return await new Promise
<Response
>((resolve
, reject
) => {
919 reject(new Error('Cannot execute a task on not started pool'))
922 if (this.destroying
) {
923 reject(new Error('Cannot execute a task on destroying pool'))
926 if (name
!= null && typeof name
!== 'string') {
927 reject(new TypeError('name argument must be a string'))
932 typeof name
=== 'string' &&
933 name
.trim().length
=== 0
935 reject(new TypeError('name argument must not be an empty string'))
938 if (transferList
!= null && !Array.isArray(transferList
)) {
939 reject(new TypeError('transferList argument must be an array'))
942 const timestamp
= performance
.now()
943 const workerNodeKey
= this.chooseWorkerNode()
944 const task
: Task
<Data
> = {
945 name
: name
?? DEFAULT_TASK_NAME
,
946 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
947 data
: data
?? ({} as Data
),
952 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
953 this.promiseResponseMap
.set(task
.taskId
!, {
957 ...(this.emitter
!= null && {
958 asyncResource
: new AsyncResource('poolifier:task', {
959 triggerAsyncId
: this.emitter
.asyncId
,
960 requireManualDestroy
: true
965 this.opts
.enableTasksQueue
=== false ||
966 (this.opts
.enableTasksQueue
=== true &&
967 this.shallExecuteTask(workerNodeKey
))
969 this.executeTask(workerNodeKey
, task
)
971 this.enqueueTask(workerNodeKey
, task
)
977 * Starts the minimum number of workers.
979 private startMinimumNumberOfWorkers (): void {
980 this.startingMinimumNumberOfWorkers
= true
982 this.workerNodes
.reduce(
983 (accumulator
, workerNode
) =>
984 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
986 ) < this.minimumNumberOfWorkers
988 this.createAndSetupWorkerNode()
990 this.startingMinimumNumberOfWorkers
= false
994 public start (): void {
996 throw new Error('Cannot start an already started pool')
999 throw new Error('Cannot start an already starting pool')
1001 if (this.destroying
) {
1002 throw new Error('Cannot start a destroying pool')
1004 this.starting
= true
1005 this.startMinimumNumberOfWorkers()
1006 this.starting
= false
1011 public async destroy (): Promise
<void> {
1012 if (!this.started
) {
1013 throw new Error('Cannot destroy an already destroyed pool')
1015 if (this.starting
) {
1016 throw new Error('Cannot destroy an starting pool')
1018 if (this.destroying
) {
1019 throw new Error('Cannot destroy an already destroying pool')
1021 this.destroying
= true
1023 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1024 await this.destroyWorkerNode(workerNodeKey
)
1027 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1028 this.emitter
?.emitDestroy()
1029 this.readyEventEmitted
= false
1030 this.destroying
= false
1031 this.started
= false
1034 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1035 await new Promise
<void>((resolve
, reject
) => {
1036 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1037 if (this.workerNodes
[workerNodeKey
] == null) {
1041 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1042 this.checkMessageWorkerId(message
)
1043 if (message
.kill
=== 'success') {
1045 } else if (message
.kill
=== 'failure') {
1048 `Kill message handling failed on worker ${message.workerId}`
1053 // FIXME: should be registered only once
1054 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1055 this.sendToWorker(workerNodeKey
, { kill
: true })
1060 * Terminates the worker node given its worker node key.
1062 * @param workerNodeKey - The worker node key.
1064 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1065 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1066 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1067 const workerNode
= this.workerNodes
[workerNodeKey
]
1068 await waitWorkerNodeEvents(
1072 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1073 getDefaultTasksQueueOptions(
1074 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1075 ).tasksFinishedTimeout
1077 await this.sendKillMessageToWorker(workerNodeKey
)
1078 await workerNode
.terminate()
1082 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1083 * Can be overridden.
1087 protected setupHook (): void {
1088 /* Intentionally empty */
1092 * Returns whether the worker is the main worker or not.
1094 * @returns `true` if the worker is the main worker, `false` otherwise.
1096 protected abstract isMain (): boolean
1099 * Hook executed before the worker task execution.
1100 * Can be overridden.
1102 * @param workerNodeKey - The worker node key.
1103 * @param task - The task to execute.
1105 protected beforeTaskExecutionHook (
1106 workerNodeKey
: number,
1109 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1110 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1111 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1112 ++workerUsage
.tasks
.executing
1113 updateWaitTimeWorkerUsage(
1114 this.workerChoiceStrategyContext
,
1120 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1121 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1122 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1125 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1126 const taskFunctionWorkerUsage
= this.workerNodes
[
1128 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1129 ].getTaskFunctionWorkerUsage(task
.name
!)!
1130 ++taskFunctionWorkerUsage
.tasks
.executing
1131 updateWaitTimeWorkerUsage(
1132 this.workerChoiceStrategyContext
,
1133 taskFunctionWorkerUsage
,
1140 * Hook executed after the worker task execution.
1141 * Can be overridden.
1143 * @param workerNodeKey - The worker node key.
1144 * @param message - The received message.
1146 protected afterTaskExecutionHook (
1147 workerNodeKey
: number,
1148 message
: MessageValue
<Response
>
1150 let needWorkerChoiceStrategyUpdate
= false
1151 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1152 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1153 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1154 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1155 updateRunTimeWorkerUsage(
1156 this.workerChoiceStrategyContext
,
1160 updateEluWorkerUsage(
1161 this.workerChoiceStrategyContext
,
1165 needWorkerChoiceStrategyUpdate
= true
1168 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1169 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1170 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1171 message
.taskPerformance
!.name
1174 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1175 const taskFunctionWorkerUsage
= this.workerNodes
[
1177 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1178 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1179 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1180 updateRunTimeWorkerUsage(
1181 this.workerChoiceStrategyContext
,
1182 taskFunctionWorkerUsage
,
1185 updateEluWorkerUsage(
1186 this.workerChoiceStrategyContext
,
1187 taskFunctionWorkerUsage
,
1190 needWorkerChoiceStrategyUpdate
= true
1192 if (needWorkerChoiceStrategyUpdate
) {
1193 this.workerChoiceStrategyContext
?.update(workerNodeKey
)
1198 * Whether the worker node shall update its task function worker usage or not.
1200 * @param workerNodeKey - The worker node key.
1201 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1203 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1204 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1206 workerInfo
!= null &&
1207 Array.isArray(workerInfo
.taskFunctionsProperties
) &&
1208 workerInfo
.taskFunctionsProperties
.length
> 2
1213 * Chooses a worker node for the next task.
1215 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1217 * @returns The chosen worker node key
1219 private chooseWorkerNode (): number {
1220 if (this.shallCreateDynamicWorker()) {
1221 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1223 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1224 .dynamicWorkerUsage
=== true
1226 return workerNodeKey
1229 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1230 return this.workerChoiceStrategyContext
!.execute()
1234 * Conditions for dynamic worker creation.
1236 * @returns Whether to create a dynamic worker or not.
1238 protected abstract shallCreateDynamicWorker (): boolean
1241 * Sends a message to worker given its worker node key.
1243 * @param workerNodeKey - The worker node key.
1244 * @param message - The message.
1245 * @param transferList - The optional array of transferable objects.
1247 protected abstract sendToWorker (
1248 workerNodeKey
: number,
1249 message
: MessageValue
<Data
>,
1250 transferList
?: readonly TransferListItem
[]
1254 * Creates a new, completely set up worker node.
1256 * @returns New, completely set up worker node key.
1258 protected createAndSetupWorkerNode (): number {
1259 const workerNode
= this.createWorkerNode()
1260 workerNode
.registerWorkerEventHandler(
1262 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1264 workerNode
.registerWorkerEventHandler(
1266 this.opts
.messageHandler
?? EMPTY_FUNCTION
1268 workerNode
.registerWorkerEventHandler(
1270 this.opts
.errorHandler
?? EMPTY_FUNCTION
1272 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1273 workerNode
.info
.ready
= false
1274 this.emitter
?.emit(PoolEvents
.error
, error
)
1278 this.opts
.restartWorkerOnError
=== true
1280 if (workerNode
.info
.dynamic
) {
1281 this.createAndSetupDynamicWorkerNode()
1282 } else if (!this.startingMinimumNumberOfWorkers
) {
1283 this.startMinimumNumberOfWorkers()
1289 this.opts
.enableTasksQueue
=== true
1291 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1293 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1294 workerNode
?.terminate().catch((error
: unknown
) => {
1295 this.emitter
?.emit(PoolEvents
.error
, error
)
1298 workerNode
.registerWorkerEventHandler(
1300 this.opts
.exitHandler
?? EMPTY_FUNCTION
1302 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1303 this.removeWorkerNode(workerNode
)
1306 !this.startingMinimumNumberOfWorkers
&&
1309 this.startMinimumNumberOfWorkers()
1312 const workerNodeKey
= this.addWorkerNode(workerNode
)
1313 this.afterWorkerNodeSetup(workerNodeKey
)
1314 return workerNodeKey
1318 * Creates a new, completely set up dynamic worker node.
1320 * @returns New, completely set up dynamic worker node key.
1322 protected createAndSetupDynamicWorkerNode (): number {
1323 const workerNodeKey
= this.createAndSetupWorkerNode()
1324 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1325 this.checkMessageWorkerId(message
)
1326 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1329 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1330 // Kill message received from worker
1332 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1333 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1334 ((this.opts
.enableTasksQueue
=== false &&
1335 workerUsage
.tasks
.executing
=== 0) ||
1336 (this.opts
.enableTasksQueue
=== true &&
1337 workerUsage
.tasks
.executing
=== 0 &&
1338 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1340 // Flag the worker node as not ready immediately
1341 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1342 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1343 this.emitter
?.emit(PoolEvents
.error
, error
)
1347 this.sendToWorker(workerNodeKey
, {
1350 if (this.taskFunctions
.size
> 0) {
1351 for (const [taskFunctionName
, taskFunctionObject
] of this.taskFunctions
) {
1352 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1353 taskFunctionOperation
: 'add',
1354 taskFunctionProperties
: buildTaskFunctionProperties(
1358 taskFunction
: taskFunctionObject
.taskFunction
.toString()
1359 }).catch((error
: unknown
) => {
1360 this.emitter
?.emit(PoolEvents
.error
, error
)
1364 const workerNode
= this.workerNodes
[workerNodeKey
]
1365 workerNode
.info
.dynamic
= true
1367 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1368 .dynamicWorkerReady
=== true ||
1369 this.workerChoiceStrategyContext
?.getStrategyPolicy()
1370 .dynamicWorkerUsage
=== true
1372 workerNode
.info
.ready
= true
1374 this.checkAndEmitDynamicWorkerCreationEvents()
1375 return workerNodeKey
1379 * Registers a listener callback on the worker given its worker node key.
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1384 protected abstract registerWorkerMessageListener
<
1385 Message
extends Data
| Response
1387 workerNodeKey
: number,
1388 listener
: (message
: MessageValue
<Message
>) => void
1392 * Registers once a listener callback on the worker given its worker node key.
1394 * @param workerNodeKey - The worker node key.
1395 * @param listener - The message listener callback.
1397 protected abstract registerOnceWorkerMessageListener
<
1398 Message
extends Data
| Response
1400 workerNodeKey
: number,
1401 listener
: (message
: MessageValue
<Message
>) => void
1405 * Deregisters a listener callback on the worker given its worker node key.
1407 * @param workerNodeKey - The worker node key.
1408 * @param listener - The message listener callback.
1410 protected abstract deregisterWorkerMessageListener
<
1411 Message
extends Data
| Response
1413 workerNodeKey
: number,
1414 listener
: (message
: MessageValue
<Message
>) => void
1418 * Method hooked up after a worker node has been newly created.
1419 * Can be overridden.
1421 * @param workerNodeKey - The newly created worker node key.
1423 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1424 // Listen to worker messages.
1425 this.registerWorkerMessageListener(
1427 this.workerMessageListener
1429 // Send the startup message to worker.
1430 this.sendStartupMessageToWorker(workerNodeKey
)
1431 // Send the statistics message to worker.
1432 this.sendStatisticsMessageToWorker(workerNodeKey
)
1433 if (this.opts
.enableTasksQueue
=== true) {
1434 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1435 this.workerNodes
[workerNodeKey
].on(
1437 this.handleWorkerNodeIdleEvent
1440 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1441 this.workerNodes
[workerNodeKey
].on(
1443 this.handleWorkerNodeBackPressureEvent
1450 * Sends the startup message to worker given its worker node key.
1452 * @param workerNodeKey - The worker node key.
1454 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1457 * Sends the statistics message to worker given its worker node key.
1459 * @param workerNodeKey - The worker node key.
1461 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1462 this.sendToWorker(workerNodeKey
, {
1465 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
1466 .runTime
.aggregate
?? false,
1468 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements().elu
1474 private cannotStealTask (): boolean {
1475 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1478 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1479 if (this.shallExecuteTask(workerNodeKey
)) {
1480 this.executeTask(workerNodeKey
, task
)
1482 this.enqueueTask(workerNodeKey
, task
)
1486 private redistributeQueuedTasks (workerNodeKey
: number): void {
1487 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1490 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1491 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1492 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1493 return workerNode
.info
.ready
&&
1494 workerNode
.usage
.tasks
.queued
<
1495 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1502 destinationWorkerNodeKey
,
1503 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1504 this.dequeueTask(workerNodeKey
)!
1509 private updateTaskStolenStatisticsWorkerUsage (
1510 workerNodeKey
: number,
1513 const workerNode
= this.workerNodes
[workerNodeKey
]
1514 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1515 if (workerNode
?.usage
!= null) {
1516 ++workerNode
.usage
.tasks
.stolen
1519 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1520 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1522 const taskFunctionWorkerUsage
=
1523 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1524 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1525 ++taskFunctionWorkerUsage
.tasks
.stolen
1529 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1530 workerNodeKey
: number
1532 const workerNode
= this.workerNodes
[workerNodeKey
]
1533 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1534 if (workerNode
?.usage
!= null) {
1535 ++workerNode
.usage
.tasks
.sequentiallyStolen
1539 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1540 workerNodeKey
: number,
1543 const workerNode
= this.workerNodes
[workerNodeKey
]
1545 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1546 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1548 const taskFunctionWorkerUsage
=
1549 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1550 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1551 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1555 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1556 workerNodeKey
: number
1558 const workerNode
= this.workerNodes
[workerNodeKey
]
1559 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1560 if (workerNode
?.usage
!= null) {
1561 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1565 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1566 workerNodeKey
: number,
1569 const workerNode
= this.workerNodes
[workerNodeKey
]
1571 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1572 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1574 const taskFunctionWorkerUsage
=
1575 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1576 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1577 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1581 private readonly handleWorkerNodeIdleEvent
= (
1582 eventDetail
: WorkerNodeEventDetail
,
1583 previousStolenTask
?: Task
<Data
>
1585 const { workerNodeKey
} = eventDetail
1586 if (workerNodeKey
== null) {
1588 "WorkerNode event detail 'workerNodeKey' property must be defined"
1591 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1593 this.cannotStealTask() ||
1594 (this.info
.stealingWorkerNodes
?? 0) >
1595 Math.floor(this.workerNodes
.length
/ 2)
1597 if (workerInfo
!= null && previousStolenTask
!= null) {
1598 workerInfo
.stealing
= false
1602 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1604 workerInfo
!= null &&
1605 previousStolenTask
!= null &&
1606 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1607 (workerNodeTasksUsage
.executing
> 0 ||
1608 this.tasksQueueSize(workerNodeKey
) > 0)
1610 workerInfo
.stealing
= false
1611 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1612 for (const taskFunctionProperties
of workerInfo
.taskFunctionsProperties
!) {
1613 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1615 taskFunctionProperties
.name
1618 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1621 if (workerInfo
== null) {
1623 `Worker node with key '${workerNodeKey}' not found in pool`
1626 workerInfo
.stealing
= true
1627 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1629 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1632 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1633 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1635 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1636 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1638 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1639 (previousStolenTask
!= null &&
1640 previousStolenTask
.name
=== stolenTask
.name
&&
1641 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1643 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1645 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1649 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1651 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1656 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1658 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1661 .catch((error
: unknown
) => {
1662 this.emitter
?.emit(PoolEvents
.error
, error
)
1666 private readonly workerNodeStealTask
= (
1667 workerNodeKey
: number
1668 ): Task
<Data
> | undefined => {
1669 const workerNodes
= this.workerNodes
1672 (workerNodeA
, workerNodeB
) =>
1673 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1675 const sourceWorkerNode
= workerNodes
.find(
1676 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1677 sourceWorkerNode
.info
.ready
&&
1678 !sourceWorkerNode
.info
.stealing
&&
1679 sourceWorkerNodeKey
!== workerNodeKey
&&
1680 sourceWorkerNode
.usage
.tasks
.queued
> 0
1682 if (sourceWorkerNode
!= null) {
1683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1684 const task
= sourceWorkerNode
.popTask()!
1685 this.handleTask(workerNodeKey
, task
)
1686 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1687 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1688 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1693 private readonly handleWorkerNodeBackPressureEvent
= (
1694 eventDetail
: WorkerNodeEventDetail
1697 this.cannotStealTask() ||
1698 (this.info
.stealingWorkerNodes
?? 0) >
1699 Math.floor(this.workerNodes
.length
/ 2)
1703 const { workerId
} = eventDetail
1704 const sizeOffset
= 1
1705 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1706 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1709 const sourceWorkerNode
=
1710 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1711 const workerNodes
= this.workerNodes
1714 (workerNodeA
, workerNodeB
) =>
1715 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1717 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1719 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1720 workerNode
.info
.ready
&&
1721 !workerNode
.info
.stealing
&&
1722 workerNode
.info
.id
!== workerId
&&
1723 workerNode
.usage
.tasks
.queued
<
1724 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1725 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1727 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1728 if (workerInfo
== null) {
1730 `Worker node with key '${workerNodeKey}' not found in pool`
1733 workerInfo
.stealing
= true
1734 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1735 const task
= sourceWorkerNode
.popTask()!
1736 this.handleTask(workerNodeKey
, task
)
1737 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1738 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1739 workerInfo
.stealing
= false
1745 * This method is the message listener registered on each worker.
1747 protected readonly workerMessageListener
= (
1748 message
: MessageValue
<Response
>
1750 this.checkMessageWorkerId(message
)
1751 const { workerId
, ready
, taskId
, taskFunctionsProperties
} = message
1752 if (ready
!= null && taskFunctionsProperties
!= null) {
1753 // Worker ready response received from worker
1754 this.handleWorkerReadyResponse(message
)
1755 } else if (taskFunctionsProperties
!= null) {
1756 // Task function properties message received from worker
1757 const workerInfo
= this.getWorkerInfo(
1758 this.getWorkerNodeKeyByWorkerId(workerId
)
1760 if (workerInfo
!= null) {
1761 workerInfo
.taskFunctionsProperties
= taskFunctionsProperties
1763 } else if (taskId
!= null) {
1764 // Task execution response received from worker
1765 this.handleTaskExecutionResponse(message
)
1769 private checkAndEmitReadyEvent (): void {
1770 if (!this.readyEventEmitted
&& this.ready
) {
1771 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1772 this.readyEventEmitted
= true
1776 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1777 const { workerId
, ready
, taskFunctionsProperties
} = message
1778 if (ready
== null || !ready
) {
1779 throw new Error(`Worker ${workerId} failed to initialize`)
1782 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1783 workerNode
.info
.ready
= ready
1784 workerNode
.info
.taskFunctionsProperties
= taskFunctionsProperties
1785 this.checkAndEmitReadyEvent()
1788 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1789 const { workerId
, taskId
, workerError
, data
} = message
1790 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1791 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1792 if (promiseResponse
!= null) {
1793 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1794 const workerNode
= this.workerNodes
[workerNodeKey
]
1795 if (workerError
!= null) {
1796 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1797 asyncResource
!= null
1798 ? asyncResource
.runInAsyncScope(
1803 : reject(workerError
.message
)
1805 asyncResource
!= null
1806 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1807 : resolve(data
as Response
)
1809 asyncResource
?.emitDestroy()
1810 this.afterTaskExecutionHook(workerNodeKey
, message
)
1811 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1812 this.promiseResponseMap
.delete(taskId
!)
1813 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1814 workerNode
?.emit('taskFinished', taskId
)
1816 this.opts
.enableTasksQueue
=== true &&
1818 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1821 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1823 this.tasksQueueSize(workerNodeKey
) > 0 &&
1824 workerNodeTasksUsage
.executing
<
1825 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1826 this.opts
.tasksQueueOptions
!.concurrency
!
1828 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1829 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1832 workerNodeTasksUsage
.executing
=== 0 &&
1833 this.tasksQueueSize(workerNodeKey
) === 0 &&
1834 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1836 workerNode
.emit('idle', {
1845 private checkAndEmitTaskExecutionEvents (): void {
1847 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1851 private checkAndEmitTaskQueuingEvents (): void {
1852 if (this.hasBackPressure()) {
1853 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1858 * Emits dynamic worker creation events.
1860 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1863 * Gets the worker information given its worker node key.
1865 * @param workerNodeKey - The worker node key.
1866 * @returns The worker information.
1868 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1869 return this.workerNodes
[workerNodeKey
]?.info
1873 * Creates a worker node.
1875 * @returns The created worker node.
1877 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1878 const workerNode
= new WorkerNode
<Worker
, Data
>(
1883 workerOptions
: this.opts
.workerOptions
,
1884 tasksQueueBackPressureSize
:
1885 this.opts
.tasksQueueOptions
?.size
??
1886 getDefaultTasksQueueOptions(
1887 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1891 // Flag the worker node as ready at pool startup.
1892 if (this.starting
) {
1893 workerNode
.info
.ready
= true
1899 * Adds the given worker node in the pool worker nodes.
1901 * @param workerNode - The worker node.
1902 * @returns The added worker node key.
1903 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1905 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1906 this.workerNodes
.push(workerNode
)
1907 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1908 if (workerNodeKey
=== -1) {
1909 throw new Error('Worker added not found in worker nodes')
1911 return workerNodeKey
1914 private checkAndEmitEmptyEvent (): void {
1916 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1917 this.readyEventEmitted
= false
1922 * Removes the worker node from the pool worker nodes.
1924 * @param workerNode - The worker node.
1926 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1927 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1928 if (workerNodeKey
!== -1) {
1929 this.workerNodes
.splice(workerNodeKey
, 1)
1930 this.workerChoiceStrategyContext
?.remove(workerNodeKey
)
1932 this.checkAndEmitEmptyEvent()
1935 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1936 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1937 if (workerInfo
!= null) {
1938 workerInfo
.ready
= false
1942 private hasBackPressure (): boolean {
1944 this.opts
.enableTasksQueue
=== true &&
1945 this.workerNodes
.findIndex(
1946 workerNode
=> !workerNode
.hasBackPressure()
1952 * Executes the given task on the worker given its worker node key.
1954 * @param workerNodeKey - The worker node key.
1955 * @param task - The task to execute.
1957 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1958 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1959 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1960 this.checkAndEmitTaskExecutionEvents()
1963 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1964 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1965 this.checkAndEmitTaskQueuingEvents()
1966 return tasksQueueSize
1969 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1970 return this.workerNodes
[workerNodeKey
].dequeueTask()
1973 private tasksQueueSize (workerNodeKey
: number): number {
1974 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1977 protected flushTasksQueue (workerNodeKey
: number): number {
1978 let flushedTasks
= 0
1979 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1980 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1981 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1984 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1988 private flushTasksQueues (): void {
1989 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1990 this.flushTasksQueue(workerNodeKey
)