1 import { AsyncResource
} from
'node:async_hooks'
2 import { randomUUID
} from
'node:crypto'
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { performance
} from
'node:perf_hooks'
5 import type { TransferListItem
} from
'node:worker_threads'
9 PromiseResponseWrapper
,
11 TaskFunctionProperties
12 } from
'../utility-types.js'
15 buildTaskFunctionProperties
,
30 } from
'../worker/task-functions.js'
31 import { KillBehaviors
} from
'../worker/worker-options.js'
39 type TasksQueueOptions
43 WorkerChoiceStrategies
,
44 type WorkerChoiceStrategy
,
45 type WorkerChoiceStrategyOptions
46 } from
'./selection-strategies/selection-strategies-types.js'
47 import { WorkerChoiceStrategiesContext
} from
'./selection-strategies/worker-choice-strategies-context.js'
50 checkValidTasksQueueOptions
,
51 checkValidWorkerChoiceStrategy
,
52 getDefaultTasksQueueOptions
,
54 updateRunTimeWorkerUsage
,
55 updateTaskStatisticsWorkerUsage
,
56 updateWaitTimeWorkerUsage
,
59 import { version
} from
'./version.js'
64 WorkerNodeEventDetail
,
67 import { WorkerNode
} from
'./worker-node.js'
70 * Base class that implements some shared logic for all poolifier pools.
72 * @typeParam Worker - Type of worker which manages this pool.
73 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
74 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
76 export abstract class AbstractPool
<
77 Worker
extends IWorker
,
80 > implements IPool
<Worker
, Data
, Response
> {
82 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
85 public emitter
?: EventEmitterAsyncResource
88 * The task execution response promise map:
89 * - `key`: The message id of each submitted task.
90 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
92 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
94 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
95 new Map
<string, PromiseResponseWrapper
<Response
>>()
98 * Worker choice strategies context referencing worker choice algorithms implementation.
100 protected workerChoiceStrategiesContext
?: WorkerChoiceStrategiesContext
<
107 * The task functions added at runtime map:
108 * - `key`: The task function name.
109 * - `value`: The task function object.
111 private readonly taskFunctions
: Map
<
113 TaskFunctionObject
<Data
, Response
>
117 * Whether the pool is started or not.
119 private started
: boolean
121 * Whether the pool is starting or not.
123 private starting
: boolean
125 * Whether the pool is destroying or not.
127 private destroying
: boolean
129 * Whether the minimum number of workers is starting or not.
131 private startingMinimumNumberOfWorkers
: boolean
133 * Whether the pool ready event has been emitted or not.
135 private readyEventEmitted
: boolean
137 * The start timestamp of the pool.
139 private readonly startTimestamp
142 * Constructs a new poolifier pool.
144 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
145 * @param filePath - Path to the worker file.
146 * @param opts - Options for the pool.
147 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
150 protected readonly minimumNumberOfWorkers
: number,
151 protected readonly filePath
: string,
152 protected readonly opts
: PoolOptions
<Worker
>,
153 protected readonly maximumNumberOfWorkers
?: number
155 if (!this.isMain()) {
157 'Cannot start a pool from a worker with the same type as the pool'
161 checkFilePath(this.filePath
)
162 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
163 this.checkPoolOptions(this.opts
)
165 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
166 this.executeTask
= this.executeTask
.bind(this)
167 this.enqueueTask
= this.enqueueTask
.bind(this)
169 if (this.opts
.enableEvents
=== true) {
170 this.initializeEventEmitter()
172 this.workerChoiceStrategiesContext
= new WorkerChoiceStrategiesContext
<
178 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
179 [this.opts
.workerChoiceStrategy
!],
180 this.opts
.workerChoiceStrategyOptions
185 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
188 this.starting
= false
189 this.destroying
= false
190 this.readyEventEmitted
= false
191 this.startingMinimumNumberOfWorkers
= false
192 if (this.opts
.startWorkers
=== true) {
196 this.startTimestamp
= performance
.now()
199 private checkPoolType (): void {
200 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
202 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
207 private checkMinimumNumberOfWorkers (
208 minimumNumberOfWorkers
: number | undefined
210 if (minimumNumberOfWorkers
== null) {
212 'Cannot instantiate a pool without specifying the number of workers'
214 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
216 'Cannot instantiate a pool with a non safe integer number of workers'
218 } else if (minimumNumberOfWorkers
< 0) {
219 throw new RangeError(
220 'Cannot instantiate a pool with a negative number of workers'
222 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
223 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
227 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
228 if (isPlainObject(opts
)) {
229 this.opts
.startWorkers
= opts
.startWorkers
?? true
230 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
)
231 this.opts
.workerChoiceStrategy
=
232 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
233 this.checkValidWorkerChoiceStrategyOptions(
234 opts
.workerChoiceStrategyOptions
236 if (opts
.workerChoiceStrategyOptions
!= null) {
237 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
239 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
240 this.opts
.enableEvents
= opts
.enableEvents
?? true
241 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
242 if (this.opts
.enableTasksQueue
) {
243 checkValidTasksQueueOptions(opts
.tasksQueueOptions
)
244 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
245 opts
.tasksQueueOptions
249 throw new TypeError('Invalid pool options: must be a plain object')
253 private checkValidWorkerChoiceStrategyOptions (
254 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
257 workerChoiceStrategyOptions
!= null &&
258 !isPlainObject(workerChoiceStrategyOptions
)
261 'Invalid worker choice strategy options: must be a plain object'
265 workerChoiceStrategyOptions
?.weights
!= null &&
266 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
267 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
270 'Invalid worker choice strategy options: must have a weight for each worker node'
274 workerChoiceStrategyOptions
?.measurement
!= null &&
275 !Object.values(Measurements
).includes(
276 workerChoiceStrategyOptions
.measurement
280 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
285 private initializeEventEmitter (): void {
286 this.emitter
= new EventEmitterAsyncResource({
287 name
: `poolifier:${this.type}-${this.worker}-pool`
292 public get
info (): PoolInfo
{
297 started
: this.started
,
299 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
300 defaultStrategy
: this.opts
.workerChoiceStrategy
!,
301 strategyRetries
: this.workerChoiceStrategiesContext
?.retriesCount
?? 0,
302 minSize
: this.minimumNumberOfWorkers
,
303 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
304 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
305 .runTime
.aggregate
=== true &&
306 this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
307 .waitTime
.aggregate
&& {
308 utilization
: round(this.utilization
)
310 workerNodes
: this.workerNodes
.length
,
311 idleWorkerNodes
: this.workerNodes
.reduce(
312 (accumulator
, workerNode
) =>
313 workerNode
.usage
.tasks
.executing
=== 0
318 ...(this.opts
.enableTasksQueue
=== true && {
319 stealingWorkerNodes
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
325 busyWorkerNodes
: this.workerNodes
.reduce(
326 (accumulator
, _workerNode
, workerNodeKey
) =>
327 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
330 executedTasks
: this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ workerNode
.usage
.tasks
.executed
,
335 executingTasks
: this.workerNodes
.reduce(
336 (accumulator
, workerNode
) =>
337 accumulator
+ workerNode
.usage
.tasks
.executing
,
340 ...(this.opts
.enableTasksQueue
=== true && {
341 queuedTasks
: this.workerNodes
.reduce(
342 (accumulator
, workerNode
) =>
343 accumulator
+ workerNode
.usage
.tasks
.queued
,
347 ...(this.opts
.enableTasksQueue
=== true && {
348 maxQueuedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ (workerNode
.usage
.tasks
.maxQueued
?? 0),
354 ...(this.opts
.enableTasksQueue
=== true && {
355 backPressure
: this.hasBackPressure()
357 ...(this.opts
.enableTasksQueue
=== true && {
358 stolenTasks
: this.workerNodes
.reduce(
359 (accumulator
, workerNode
) =>
360 accumulator
+ workerNode
.usage
.tasks
.stolen
,
364 failedTasks
: this.workerNodes
.reduce(
365 (accumulator
, workerNode
) =>
366 accumulator
+ workerNode
.usage
.tasks
.failed
,
369 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
370 .runTime
.aggregate
=== true && {
374 ...this.workerNodes
.map(
375 workerNode
=> workerNode
.usage
.runTime
.minimum
?? Infinity
381 ...this.workerNodes
.map(
382 workerNode
=> workerNode
.usage
.runTime
.maximum
?? -Infinity
386 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
387 .runTime
.average
&& {
390 this.workerNodes
.reduce
<number[]>(
391 (accumulator
, workerNode
) =>
392 accumulator
.concat(workerNode
.usage
.runTime
.history
),
398 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
402 this.workerNodes
.reduce
<number[]>(
403 (accumulator
, workerNode
) =>
404 accumulator
.concat(workerNode
.usage
.runTime
.history
),
412 ...(this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
413 .waitTime
.aggregate
=== true && {
417 ...this.workerNodes
.map(
418 workerNode
=> workerNode
.usage
.waitTime
.minimum
?? Infinity
424 ...this.workerNodes
.map(
425 workerNode
=> workerNode
.usage
.waitTime
.maximum
?? -Infinity
429 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
430 .waitTime
.average
&& {
433 this.workerNodes
.reduce
<number[]>(
434 (accumulator
, workerNode
) =>
435 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
441 ...(this.workerChoiceStrategiesContext
.getTaskStatisticsRequirements()
442 .waitTime
.median
&& {
445 this.workerNodes
.reduce
<number[]>(
446 (accumulator
, workerNode
) =>
447 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
459 * The pool readiness boolean status.
461 private get
ready (): boolean {
466 this.workerNodes
.reduce(
467 (accumulator
, workerNode
) =>
468 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
472 ) >= this.minimumNumberOfWorkers
477 * The pool emptiness boolean status.
479 protected get
empty (): boolean {
480 return this.minimumNumberOfWorkers
=== 0 && this.workerNodes
.length
=== 0
484 * The approximate pool utilization.
486 * @returns The pool utilization.
488 private get
utilization (): number {
489 const poolTimeCapacity
=
490 (performance
.now() - this.startTimestamp
) *
491 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
492 const totalTasksRunTime
= this.workerNodes
.reduce(
493 (accumulator
, workerNode
) =>
494 accumulator
+ (workerNode
.usage
.runTime
.aggregate
?? 0),
497 const totalTasksWaitTime
= this.workerNodes
.reduce(
498 (accumulator
, workerNode
) =>
499 accumulator
+ (workerNode
.usage
.waitTime
.aggregate
?? 0),
502 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
508 * If it is `'dynamic'`, it provides the `max` property.
510 protected abstract get
type (): PoolType
515 protected abstract get
worker (): WorkerType
518 * Checks if the worker id sent in the received message from a worker is valid.
520 * @param message - The received message.
521 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
523 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
524 if (message
.workerId
== null) {
525 throw new Error('Worker message received without worker id')
526 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
528 `Worker message received from unknown worker '${message.workerId}'`
534 * Gets the worker node key given its worker id.
536 * @param workerId - The worker id.
537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
539 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
540 return this.workerNodes
.findIndex(
541 workerNode
=> workerNode
.info
.id
=== workerId
546 public setWorkerChoiceStrategy (
547 workerChoiceStrategy
: WorkerChoiceStrategy
,
548 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
550 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
551 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
552 this.workerChoiceStrategiesContext
?.setDefaultWorkerChoiceStrategy(
553 this.opts
.workerChoiceStrategy
,
554 workerChoiceStrategyOptions
556 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
557 this.sendStatisticsMessageToWorker(workerNodeKey
)
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
| undefined
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
566 if (workerChoiceStrategyOptions
!= null) {
567 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
569 this.workerChoiceStrategiesContext
?.setOptions(
570 this.opts
.workerChoiceStrategyOptions
575 public enableTasksQueue (
577 tasksQueueOptions
?: TasksQueueOptions
579 if (this.opts
.enableTasksQueue
=== true && !enable
) {
580 this.unsetTaskStealing()
581 this.unsetTasksStealingOnBackPressure()
582 this.flushTasksQueues()
584 this.opts
.enableTasksQueue
= enable
585 this.setTasksQueueOptions(tasksQueueOptions
)
589 public setTasksQueueOptions (
590 tasksQueueOptions
: TasksQueueOptions
| undefined
592 if (this.opts
.enableTasksQueue
=== true) {
593 checkValidTasksQueueOptions(tasksQueueOptions
)
594 this.opts
.tasksQueueOptions
=
595 this.buildTasksQueueOptions(tasksQueueOptions
)
596 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
597 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
598 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
599 this.unsetTaskStealing()
600 this.setTaskStealing()
602 this.unsetTaskStealing()
604 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
605 this.unsetTasksStealingOnBackPressure()
606 this.setTasksStealingOnBackPressure()
608 this.unsetTasksStealingOnBackPressure()
610 } else if (this.opts
.tasksQueueOptions
!= null) {
611 delete this.opts
.tasksQueueOptions
615 private buildTasksQueueOptions (
616 tasksQueueOptions
: TasksQueueOptions
| undefined
617 ): TasksQueueOptions
{
619 ...getDefaultTasksQueueOptions(
620 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
626 private setTasksQueueSize (size
: number): void {
627 for (const workerNode
of this.workerNodes
) {
628 workerNode
.tasksQueueBackPressureSize
= size
632 private setTaskStealing (): void {
633 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
634 this.workerNodes
[workerNodeKey
].on('idle', this.handleWorkerNodeIdleEvent
)
638 private unsetTaskStealing (): void {
639 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
640 this.workerNodes
[workerNodeKey
].off(
642 this.handleWorkerNodeIdleEvent
647 private setTasksStealingOnBackPressure (): void {
648 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
649 this.workerNodes
[workerNodeKey
].on(
651 this.handleWorkerNodeBackPressureEvent
656 private unsetTasksStealingOnBackPressure (): void {
657 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
658 this.workerNodes
[workerNodeKey
].off(
660 this.handleWorkerNodeBackPressureEvent
666 * Whether the pool is full or not.
668 * The pool filling boolean status.
670 protected get
full (): boolean {
672 this.workerNodes
.length
>=
673 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
678 * Whether the pool is busy or not.
680 * The pool busyness boolean status.
682 protected abstract get
busy (): boolean
685 * Whether worker nodes are executing concurrently their tasks quota or not.
687 * @returns Worker nodes busyness boolean status.
689 protected internalBusy (): boolean {
690 if (this.opts
.enableTasksQueue
=== true) {
692 this.workerNodes
.findIndex(
694 workerNode
.info
.ready
&&
695 workerNode
.usage
.tasks
.executing
<
696 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
697 this.opts
.tasksQueueOptions
!.concurrency
!
702 this.workerNodes
.findIndex(
704 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
709 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
710 if (this.opts
.enableTasksQueue
=== true) {
712 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
713 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
714 this.opts
.tasksQueueOptions
!.concurrency
!
717 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
720 private async sendTaskFunctionOperationToWorker (
721 workerNodeKey
: number,
722 message
: MessageValue
<Data
>
723 ): Promise
<boolean> {
724 return await new Promise
<boolean>((resolve
, reject
) => {
725 const taskFunctionOperationListener
= (
726 message
: MessageValue
<Response
>
728 this.checkMessageWorkerId(message
)
729 const workerId
= this.getWorkerInfo(workerNodeKey
)?.id
731 message
.taskFunctionOperationStatus
!= null &&
732 message
.workerId
=== workerId
734 if (message
.taskFunctionOperationStatus
) {
739 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
743 this.deregisterWorkerMessageListener(
744 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
745 taskFunctionOperationListener
749 this.registerWorkerMessageListener(
751 taskFunctionOperationListener
753 this.sendToWorker(workerNodeKey
, message
)
757 private async sendTaskFunctionOperationToWorkers (
758 message
: MessageValue
<Data
>
759 ): Promise
<boolean> {
760 return await new Promise
<boolean>((resolve
, reject
) => {
761 const responsesReceived
= new Array<MessageValue
<Response
>>()
762 const taskFunctionOperationsListener
= (
763 message
: MessageValue
<Response
>
765 this.checkMessageWorkerId(message
)
766 if (message
.taskFunctionOperationStatus
!= null) {
767 responsesReceived
.push(message
)
768 if (responsesReceived
.length
=== this.workerNodes
.length
) {
770 responsesReceived
.every(
771 message
=> message
.taskFunctionOperationStatus
=== true
776 responsesReceived
.some(
777 message
=> message
.taskFunctionOperationStatus
=== false
780 const errorResponse
= responsesReceived
.find(
781 response
=> response
.taskFunctionOperationStatus
=== false
785 `Task function operation '${
786 message.taskFunctionOperation as string
787 }' failed on worker ${errorResponse?.workerId} with error: '${
788 errorResponse?.workerError?.message
793 this.deregisterWorkerMessageListener(
794 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
795 taskFunctionOperationsListener
800 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
801 this.registerWorkerMessageListener(
803 taskFunctionOperationsListener
805 this.sendToWorker(workerNodeKey
, message
)
811 public hasTaskFunction (name
: string): boolean {
812 for (const workerNode
of this.workerNodes
) {
814 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
815 workerNode
.info
.taskFunctionsProperties
.some(
816 taskFunctionProperties
=> taskFunctionProperties
.name
=== name
826 public async addTaskFunction (
828 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
829 ): Promise
<boolean> {
830 if (typeof name
!== 'string') {
831 throw new TypeError('name argument must be a string')
833 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
834 throw new TypeError('name argument must not be an empty string')
836 if (typeof fn
=== 'function') {
837 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
839 if (typeof fn
.taskFunction
!== 'function') {
840 throw new TypeError('taskFunction property must be a function')
842 const opResult
= await this.sendTaskFunctionOperationToWorkers({
843 taskFunctionOperation
: 'add',
844 taskFunctionProperties
: buildTaskFunctionProperties(name
, fn
),
845 taskFunction
: fn
.taskFunction
.toString()
847 this.taskFunctions
.set(name
, fn
)
852 public async removeTaskFunction (name
: string): Promise
<boolean> {
853 if (!this.taskFunctions
.has(name
)) {
855 'Cannot remove a task function not handled on the pool side'
858 const opResult
= await this.sendTaskFunctionOperationToWorkers({
859 taskFunctionOperation
: 'remove',
860 taskFunctionProperties
: buildTaskFunctionProperties(
862 this.taskFunctions
.get(name
)
865 this.deleteTaskFunctionWorkerUsages(name
)
866 this.taskFunctions
.delete(name
)
871 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
872 for (const workerNode
of this.workerNodes
) {
874 Array.isArray(workerNode
.info
.taskFunctionsProperties
) &&
875 workerNode
.info
.taskFunctionsProperties
.length
> 0
877 return workerNode
.info
.taskFunctionsProperties
884 * Gets task function strategy, if any.
886 * @param name - The task function name.
887 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
889 private readonly getTaskFunctionWorkerWorkerChoiceStrategy
= (
891 ): WorkerChoiceStrategy
| undefined => {
893 return this.listTaskFunctionsProperties().find(
894 (taskFunctionProperties
: TaskFunctionProperties
) =>
895 taskFunctionProperties
.name
=== name
901 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
902 return await this.sendTaskFunctionOperationToWorkers({
903 taskFunctionOperation
: 'default',
904 taskFunctionProperties
: buildTaskFunctionProperties(
906 this.taskFunctions
.get(name
)
911 private deleteTaskFunctionWorkerUsages (name
: string): void {
912 for (const workerNode
of this.workerNodes
) {
913 workerNode
.deleteTaskFunctionWorkerUsage(name
)
917 private shallExecuteTask (workerNodeKey
: number): boolean {
919 this.tasksQueueSize(workerNodeKey
) === 0 &&
920 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
921 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
922 this.opts
.tasksQueueOptions
!.concurrency
!
927 public async execute (
930 transferList
?: readonly TransferListItem
[]
931 ): Promise
<Response
> {
932 return await new Promise
<Response
>((resolve
, reject
) => {
934 reject(new Error('Cannot execute a task on not started pool'))
937 if (this.destroying
) {
938 reject(new Error('Cannot execute a task on destroying pool'))
941 if (name
!= null && typeof name
!== 'string') {
942 reject(new TypeError('name argument must be a string'))
947 typeof name
=== 'string' &&
948 name
.trim().length
=== 0
950 reject(new TypeError('name argument must not be an empty string'))
953 if (transferList
!= null && !Array.isArray(transferList
)) {
954 reject(new TypeError('transferList argument must be an array'))
957 const timestamp
= performance
.now()
958 const workerNodeKey
= this.chooseWorkerNode(
959 this.getTaskFunctionWorkerWorkerChoiceStrategy(name
)
961 const task
: Task
<Data
> = {
962 name
: name
?? DEFAULT_TASK_NAME
,
963 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
964 data
: data
?? ({} as Data
),
969 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
970 this.promiseResponseMap
.set(task
.taskId
!, {
974 ...(this.emitter
!= null && {
975 asyncResource
: new AsyncResource('poolifier:task', {
976 triggerAsyncId
: this.emitter
.asyncId
,
977 requireManualDestroy
: true
982 this.opts
.enableTasksQueue
=== false ||
983 (this.opts
.enableTasksQueue
=== true &&
984 this.shallExecuteTask(workerNodeKey
))
986 this.executeTask(workerNodeKey
, task
)
988 this.enqueueTask(workerNodeKey
, task
)
994 * Starts the minimum number of workers.
996 private startMinimumNumberOfWorkers (): void {
997 this.startingMinimumNumberOfWorkers
= true
999 this.workerNodes
.reduce(
1000 (accumulator
, workerNode
) =>
1001 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
1003 ) < this.minimumNumberOfWorkers
1005 this.createAndSetupWorkerNode()
1007 this.startingMinimumNumberOfWorkers
= false
1011 public start (): void {
1013 throw new Error('Cannot start an already started pool')
1015 if (this.starting
) {
1016 throw new Error('Cannot start an already starting pool')
1018 if (this.destroying
) {
1019 throw new Error('Cannot start a destroying pool')
1021 this.starting
= true
1022 this.startMinimumNumberOfWorkers()
1023 this.starting
= false
1028 public async destroy (): Promise
<void> {
1029 if (!this.started
) {
1030 throw new Error('Cannot destroy an already destroyed pool')
1032 if (this.starting
) {
1033 throw new Error('Cannot destroy an starting pool')
1035 if (this.destroying
) {
1036 throw new Error('Cannot destroy an already destroying pool')
1038 this.destroying
= true
1040 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1041 await this.destroyWorkerNode(workerNodeKey
)
1044 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1045 this.emitter
?.emitDestroy()
1046 this.readyEventEmitted
= false
1047 this.destroying
= false
1048 this.started
= false
1051 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1052 await new Promise
<void>((resolve
, reject
) => {
1053 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1054 if (this.workerNodes
[workerNodeKey
] == null) {
1058 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1059 this.checkMessageWorkerId(message
)
1060 if (message
.kill
=== 'success') {
1062 } else if (message
.kill
=== 'failure') {
1065 `Kill message handling failed on worker ${message.workerId}`
1070 // FIXME: should be registered only once
1071 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1072 this.sendToWorker(workerNodeKey
, { kill
: true })
1077 * Terminates the worker node given its worker node key.
1079 * @param workerNodeKey - The worker node key.
1081 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1082 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1083 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1084 const workerNode
= this.workerNodes
[workerNodeKey
]
1085 await waitWorkerNodeEvents(
1089 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1090 getDefaultTasksQueueOptions(
1091 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1092 ).tasksFinishedTimeout
1094 await this.sendKillMessageToWorker(workerNodeKey
)
1095 await workerNode
.terminate()
1099 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1100 * Can be overridden.
1104 protected setupHook (): void {
1105 /* Intentionally empty */
1109 * Returns whether the worker is the main worker or not.
1111 * @returns `true` if the worker is the main worker, `false` otherwise.
1113 protected abstract isMain (): boolean
1116 * Hook executed before the worker task execution.
1117 * Can be overridden.
1119 * @param workerNodeKey - The worker node key.
1120 * @param task - The task to execute.
1122 protected beforeTaskExecutionHook (
1123 workerNodeKey
: number,
1126 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1127 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1128 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1129 ++workerUsage
.tasks
.executing
1130 updateWaitTimeWorkerUsage(
1131 this.workerChoiceStrategiesContext
,
1137 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1138 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1139 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 const taskFunctionWorkerUsage
= this.workerNodes
[
1145 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1146 ].getTaskFunctionWorkerUsage(task
.name
!)!
1147 ++taskFunctionWorkerUsage
.tasks
.executing
1148 updateWaitTimeWorkerUsage(
1149 this.workerChoiceStrategiesContext
,
1150 taskFunctionWorkerUsage
,
1157 * Hook executed after the worker task execution.
1158 * Can be overridden.
1160 * @param workerNodeKey - The worker node key.
1161 * @param message - The received message.
1163 protected afterTaskExecutionHook (
1164 workerNodeKey
: number,
1165 message
: MessageValue
<Response
>
1167 let needWorkerChoiceStrategyUpdate
= false
1168 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1169 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1170 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1171 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1172 updateRunTimeWorkerUsage(
1173 this.workerChoiceStrategiesContext
,
1177 updateEluWorkerUsage(
1178 this.workerChoiceStrategiesContext
,
1182 needWorkerChoiceStrategyUpdate
= true
1185 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1186 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1187 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1188 message
.taskPerformance
!.name
1191 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1192 const taskFunctionWorkerUsage
= this.workerNodes
[
1194 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1195 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1196 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1197 updateRunTimeWorkerUsage(
1198 this.workerChoiceStrategiesContext
,
1199 taskFunctionWorkerUsage
,
1202 updateEluWorkerUsage(
1203 this.workerChoiceStrategiesContext
,
1204 taskFunctionWorkerUsage
,
1207 needWorkerChoiceStrategyUpdate
= true
1209 if (needWorkerChoiceStrategyUpdate
) {
1210 this.workerChoiceStrategiesContext
?.update(workerNodeKey
)
1215 * Whether the worker node shall update its task function worker usage or not.
1217 * @param workerNodeKey - The worker node key.
1218 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1220 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1221 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1223 workerInfo
!= null &&
1224 Array.isArray(workerInfo
.taskFunctionsProperties
) &&
1225 workerInfo
.taskFunctionsProperties
.length
> 2
1230 * Chooses a worker node for the next task.
1232 * @param workerChoiceStrategy - The worker choice strategy.
1233 * @returns The chosen worker node key
1235 private chooseWorkerNode (
1236 workerChoiceStrategy
?: WorkerChoiceStrategy
1238 if (this.shallCreateDynamicWorker()) {
1239 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1241 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1244 return workerNodeKey
1247 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1248 return this.workerChoiceStrategiesContext
!.execute(workerChoiceStrategy
)
1252 * Conditions for dynamic worker creation.
1254 * @returns Whether to create a dynamic worker or not.
1256 protected abstract shallCreateDynamicWorker (): boolean
1259 * Sends a message to worker given its worker node key.
1261 * @param workerNodeKey - The worker node key.
1262 * @param message - The message.
1263 * @param transferList - The optional array of transferable objects.
1265 protected abstract sendToWorker (
1266 workerNodeKey
: number,
1267 message
: MessageValue
<Data
>,
1268 transferList
?: readonly TransferListItem
[]
1272 * Creates a new, completely set up worker node.
1274 * @returns New, completely set up worker node key.
1276 protected createAndSetupWorkerNode (): number {
1277 const workerNode
= this.createWorkerNode()
1278 workerNode
.registerWorkerEventHandler(
1280 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1282 workerNode
.registerWorkerEventHandler(
1284 this.opts
.messageHandler
?? EMPTY_FUNCTION
1286 workerNode
.registerWorkerEventHandler(
1288 this.opts
.errorHandler
?? EMPTY_FUNCTION
1290 workerNode
.registerOnceWorkerEventHandler('error', (error
: Error) => {
1291 workerNode
.info
.ready
= false
1292 this.emitter
?.emit(PoolEvents
.error
, error
)
1296 this.opts
.restartWorkerOnError
=== true
1298 if (workerNode
.info
.dynamic
) {
1299 this.createAndSetupDynamicWorkerNode()
1300 } else if (!this.startingMinimumNumberOfWorkers
) {
1301 this.startMinimumNumberOfWorkers()
1307 this.opts
.enableTasksQueue
=== true
1309 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1311 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1312 workerNode
?.terminate().catch((error
: unknown
) => {
1313 this.emitter
?.emit(PoolEvents
.error
, error
)
1316 workerNode
.registerWorkerEventHandler(
1318 this.opts
.exitHandler
?? EMPTY_FUNCTION
1320 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1321 this.removeWorkerNode(workerNode
)
1324 !this.startingMinimumNumberOfWorkers
&&
1327 this.startMinimumNumberOfWorkers()
1330 const workerNodeKey
= this.addWorkerNode(workerNode
)
1331 this.afterWorkerNodeSetup(workerNodeKey
)
1332 return workerNodeKey
1336 * Creates a new, completely set up dynamic worker node.
1338 * @returns New, completely set up dynamic worker node key.
1340 protected createAndSetupDynamicWorkerNode (): number {
1341 const workerNodeKey
= this.createAndSetupWorkerNode()
1342 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1343 this.checkMessageWorkerId(message
)
1344 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1347 const workerUsage
= this.workerNodes
[localWorkerNodeKey
]?.usage
1348 // Kill message received from worker
1350 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1351 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1352 ((this.opts
.enableTasksQueue
=== false &&
1353 workerUsage
.tasks
.executing
=== 0) ||
1354 (this.opts
.enableTasksQueue
=== true &&
1355 workerUsage
.tasks
.executing
=== 0 &&
1356 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1358 // Flag the worker node as not ready immediately
1359 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1360 this.destroyWorkerNode(localWorkerNodeKey
).catch((error
: unknown
) => {
1361 this.emitter
?.emit(PoolEvents
.error
, error
)
1365 this.sendToWorker(workerNodeKey
, {
1368 if (this.taskFunctions
.size
> 0) {
1369 for (const [taskFunctionName
, taskFunctionObject
] of this.taskFunctions
) {
1370 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1371 taskFunctionOperation
: 'add',
1372 taskFunctionProperties
: buildTaskFunctionProperties(
1376 taskFunction
: taskFunctionObject
.taskFunction
.toString()
1377 }).catch((error
: unknown
) => {
1378 this.emitter
?.emit(PoolEvents
.error
, error
)
1382 const workerNode
= this.workerNodes
[workerNodeKey
]
1383 workerNode
.info
.dynamic
= true
1385 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerReady
===
1387 this.workerChoiceStrategiesContext
?.getPolicy().dynamicWorkerUsage
===
1390 workerNode
.info
.ready
= true
1392 this.checkAndEmitDynamicWorkerCreationEvents()
1393 return workerNodeKey
1397 * Registers a listener callback on the worker given its worker node key.
1399 * @param workerNodeKey - The worker node key.
1400 * @param listener - The message listener callback.
1402 protected abstract registerWorkerMessageListener
<
1403 Message
extends Data
| Response
1405 workerNodeKey
: number,
1406 listener
: (message
: MessageValue
<Message
>) => void
1410 * Registers once a listener callback on the worker given its worker node key.
1412 * @param workerNodeKey - The worker node key.
1413 * @param listener - The message listener callback.
1415 protected abstract registerOnceWorkerMessageListener
<
1416 Message
extends Data
| Response
1418 workerNodeKey
: number,
1419 listener
: (message
: MessageValue
<Message
>) => void
1423 * Deregisters a listener callback on the worker given its worker node key.
1425 * @param workerNodeKey - The worker node key.
1426 * @param listener - The message listener callback.
1428 protected abstract deregisterWorkerMessageListener
<
1429 Message
extends Data
| Response
1431 workerNodeKey
: number,
1432 listener
: (message
: MessageValue
<Message
>) => void
1436 * Method hooked up after a worker node has been newly created.
1437 * Can be overridden.
1439 * @param workerNodeKey - The newly created worker node key.
1441 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1442 // Listen to worker messages.
1443 this.registerWorkerMessageListener(
1445 this.workerMessageListener
1447 // Send the startup message to worker.
1448 this.sendStartupMessageToWorker(workerNodeKey
)
1449 // Send the statistics message to worker.
1450 this.sendStatisticsMessageToWorker(workerNodeKey
)
1451 if (this.opts
.enableTasksQueue
=== true) {
1452 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1453 this.workerNodes
[workerNodeKey
].on(
1455 this.handleWorkerNodeIdleEvent
1458 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1459 this.workerNodes
[workerNodeKey
].on(
1461 this.handleWorkerNodeBackPressureEvent
1468 * Sends the startup message to worker given its worker node key.
1470 * @param workerNodeKey - The worker node key.
1472 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1475 * Sends the statistics message to worker given its worker node key.
1477 * @param workerNodeKey - The worker node key.
1479 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1480 this.sendToWorker(workerNodeKey
, {
1483 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1484 .runTime
.aggregate
?? false,
1486 this.workerChoiceStrategiesContext
?.getTaskStatisticsRequirements()
1487 .elu
.aggregate
?? false
1492 private cannotStealTask (): boolean {
1493 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1496 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1497 if (this.shallExecuteTask(workerNodeKey
)) {
1498 this.executeTask(workerNodeKey
, task
)
1500 this.enqueueTask(workerNodeKey
, task
)
1504 private redistributeQueuedTasks (workerNodeKey
: number): void {
1505 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1508 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1509 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1510 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1511 return workerNode
.info
.ready
&&
1512 workerNode
.usage
.tasks
.queued
<
1513 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1520 destinationWorkerNodeKey
,
1521 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1522 this.dequeueTask(workerNodeKey
)!
1527 private updateTaskStolenStatisticsWorkerUsage (
1528 workerNodeKey
: number,
1531 const workerNode
= this.workerNodes
[workerNodeKey
]
1532 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1533 if (workerNode
?.usage
!= null) {
1534 ++workerNode
.usage
.tasks
.stolen
1537 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1538 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1540 const taskFunctionWorkerUsage
=
1541 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1542 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1543 ++taskFunctionWorkerUsage
.tasks
.stolen
1547 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1548 workerNodeKey
: number
1550 const workerNode
= this.workerNodes
[workerNodeKey
]
1551 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1552 if (workerNode
?.usage
!= null) {
1553 ++workerNode
.usage
.tasks
.sequentiallyStolen
1557 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1558 workerNodeKey
: number,
1561 const workerNode
= this.workerNodes
[workerNodeKey
]
1563 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1564 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1566 const taskFunctionWorkerUsage
=
1567 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1568 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1569 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1573 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1574 workerNodeKey
: number
1576 const workerNode
= this.workerNodes
[workerNodeKey
]
1577 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1578 if (workerNode
?.usage
!= null) {
1579 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1583 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1584 workerNodeKey
: number,
1587 const workerNode
= this.workerNodes
[workerNodeKey
]
1589 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1590 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1592 const taskFunctionWorkerUsage
=
1593 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1594 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1595 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1599 private readonly handleWorkerNodeIdleEvent
= (
1600 eventDetail
: WorkerNodeEventDetail
,
1601 previousStolenTask
?: Task
<Data
>
1603 const { workerNodeKey
} = eventDetail
1604 if (workerNodeKey
== null) {
1606 "WorkerNode event detail 'workerNodeKey' property must be defined"
1609 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1611 this.cannotStealTask() ||
1612 (this.info
.stealingWorkerNodes
?? 0) >
1613 Math.floor(this.workerNodes
.length
/ 2)
1615 if (workerInfo
!= null && previousStolenTask
!= null) {
1616 workerInfo
.stealing
= false
1620 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1622 workerInfo
!= null &&
1623 previousStolenTask
!= null &&
1624 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1625 (workerNodeTasksUsage
.executing
> 0 ||
1626 this.tasksQueueSize(workerNodeKey
) > 0)
1628 workerInfo
.stealing
= false
1629 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1630 for (const taskFunctionProperties
of workerInfo
.taskFunctionsProperties
!) {
1631 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1633 taskFunctionProperties
.name
1636 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1639 if (workerInfo
== null) {
1641 `Worker node with key '${workerNodeKey}' not found in pool`
1644 workerInfo
.stealing
= true
1645 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1647 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1650 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1651 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1653 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1654 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1656 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1657 (previousStolenTask
!= null &&
1658 previousStolenTask
.name
=== stolenTask
.name
&&
1659 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1661 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1663 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1667 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1669 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1674 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1676 this.handleWorkerNodeIdleEvent(eventDetail
, stolenTask
)
1679 .catch((error
: unknown
) => {
1680 this.emitter
?.emit(PoolEvents
.error
, error
)
1684 private readonly workerNodeStealTask
= (
1685 workerNodeKey
: number
1686 ): Task
<Data
> | undefined => {
1687 const workerNodes
= this.workerNodes
1690 (workerNodeA
, workerNodeB
) =>
1691 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1693 const sourceWorkerNode
= workerNodes
.find(
1694 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1695 sourceWorkerNode
.info
.ready
&&
1696 !sourceWorkerNode
.info
.stealing
&&
1697 sourceWorkerNodeKey
!== workerNodeKey
&&
1698 sourceWorkerNode
.usage
.tasks
.queued
> 0
1700 if (sourceWorkerNode
!= null) {
1701 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1702 const task
= sourceWorkerNode
.popTask()!
1703 this.handleTask(workerNodeKey
, task
)
1704 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1705 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1706 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1711 private readonly handleWorkerNodeBackPressureEvent
= (
1712 eventDetail
: WorkerNodeEventDetail
1715 this.cannotStealTask() ||
1716 (this.info
.stealingWorkerNodes
?? 0) >
1717 Math.floor(this.workerNodes
.length
/ 2)
1721 const { workerId
} = eventDetail
1722 const sizeOffset
= 1
1723 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1724 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1727 const sourceWorkerNode
=
1728 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1729 const workerNodes
= this.workerNodes
1732 (workerNodeA
, workerNodeB
) =>
1733 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1735 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1737 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1738 workerNode
.info
.ready
&&
1739 !workerNode
.info
.stealing
&&
1740 workerNode
.info
.id
!== workerId
&&
1741 workerNode
.usage
.tasks
.queued
<
1742 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1743 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1745 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1746 if (workerInfo
== null) {
1748 `Worker node with key '${workerNodeKey}' not found in pool`
1751 workerInfo
.stealing
= true
1752 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1753 const task
= sourceWorkerNode
.popTask()!
1754 this.handleTask(workerNodeKey
, task
)
1755 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1756 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1757 workerInfo
.stealing
= false
1763 * This method is the message listener registered on each worker.
1765 protected readonly workerMessageListener
= (
1766 message
: MessageValue
<Response
>
1768 this.checkMessageWorkerId(message
)
1769 const { workerId
, ready
, taskId
, taskFunctionsProperties
} = message
1770 if (ready
!= null && taskFunctionsProperties
!= null) {
1771 // Worker ready response received from worker
1772 this.handleWorkerReadyResponse(message
)
1773 } else if (taskFunctionsProperties
!= null) {
1774 // Task function properties message received from worker
1775 const workerInfo
= this.getWorkerInfo(
1776 this.getWorkerNodeKeyByWorkerId(workerId
)
1778 if (workerInfo
!= null) {
1779 workerInfo
.taskFunctionsProperties
= taskFunctionsProperties
1781 } else if (taskId
!= null) {
1782 // Task execution response received from worker
1783 this.handleTaskExecutionResponse(message
)
1787 private checkAndEmitReadyEvent (): void {
1788 if (!this.readyEventEmitted
&& this.ready
) {
1789 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1790 this.readyEventEmitted
= true
1794 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1795 const { workerId
, ready
, taskFunctionsProperties
} = message
1796 if (ready
== null || !ready
) {
1797 throw new Error(`Worker ${workerId} failed to initialize`)
1800 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1801 workerNode
.info
.ready
= ready
1802 workerNode
.info
.taskFunctionsProperties
= taskFunctionsProperties
1803 this.checkAndEmitReadyEvent()
1806 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1807 const { workerId
, taskId
, workerError
, data
} = message
1808 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1809 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1810 if (promiseResponse
!= null) {
1811 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1812 const workerNode
= this.workerNodes
[workerNodeKey
]
1813 if (workerError
!= null) {
1814 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1815 asyncResource
!= null
1816 ? asyncResource
.runInAsyncScope(
1821 : reject(workerError
.message
)
1823 asyncResource
!= null
1824 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1825 : resolve(data
as Response
)
1827 asyncResource
?.emitDestroy()
1828 this.afterTaskExecutionHook(workerNodeKey
, message
)
1829 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1830 this.promiseResponseMap
.delete(taskId
!)
1831 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1832 workerNode
?.emit('taskFinished', taskId
)
1834 this.opts
.enableTasksQueue
=== true &&
1836 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1839 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1841 this.tasksQueueSize(workerNodeKey
) > 0 &&
1842 workerNodeTasksUsage
.executing
<
1843 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1844 this.opts
.tasksQueueOptions
!.concurrency
!
1846 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1847 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1850 workerNodeTasksUsage
.executing
=== 0 &&
1851 this.tasksQueueSize(workerNodeKey
) === 0 &&
1852 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1854 workerNode
.emit('idle', {
1863 private checkAndEmitTaskExecutionEvents (): void {
1865 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1869 private checkAndEmitTaskQueuingEvents (): void {
1870 if (this.hasBackPressure()) {
1871 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1876 * Emits dynamic worker creation events.
1878 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1881 * Gets the worker information given its worker node key.
1883 * @param workerNodeKey - The worker node key.
1884 * @returns The worker information.
1886 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
| undefined {
1887 return this.workerNodes
[workerNodeKey
]?.info
1891 * Creates a worker node.
1893 * @returns The created worker node.
1895 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1896 const workerNode
= new WorkerNode
<Worker
, Data
>(
1901 workerOptions
: this.opts
.workerOptions
,
1902 tasksQueueBackPressureSize
:
1903 this.opts
.tasksQueueOptions
?.size
??
1904 getDefaultTasksQueueOptions(
1905 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1909 // Flag the worker node as ready at pool startup.
1910 if (this.starting
) {
1911 workerNode
.info
.ready
= true
1917 * Adds the given worker node in the pool worker nodes.
1919 * @param workerNode - The worker node.
1920 * @returns The added worker node key.
1921 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1923 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1924 this.workerNodes
.push(workerNode
)
1925 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1926 if (workerNodeKey
=== -1) {
1927 throw new Error('Worker added not found in worker nodes')
1929 return workerNodeKey
1932 private checkAndEmitEmptyEvent (): void {
1934 this.emitter
?.emit(PoolEvents
.empty
, this.info
)
1935 this.readyEventEmitted
= false
1940 * Removes the worker node from the pool worker nodes.
1942 * @param workerNode - The worker node.
1944 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1945 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1946 if (workerNodeKey
!== -1) {
1947 this.workerNodes
.splice(workerNodeKey
, 1)
1948 this.workerChoiceStrategiesContext
?.remove(workerNodeKey
)
1950 this.checkAndEmitEmptyEvent()
1953 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1954 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1955 if (workerInfo
!= null) {
1956 workerInfo
.ready
= false
1960 private hasBackPressure (): boolean {
1962 this.opts
.enableTasksQueue
=== true &&
1963 this.workerNodes
.findIndex(
1964 workerNode
=> !workerNode
.hasBackPressure()
1970 * Executes the given task on the worker given its worker node key.
1972 * @param workerNodeKey - The worker node key.
1973 * @param task - The task to execute.
1975 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1976 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1977 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1978 this.checkAndEmitTaskExecutionEvents()
1981 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1982 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1983 this.checkAndEmitTaskQueuingEvents()
1984 return tasksQueueSize
1987 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1988 return this.workerNodes
[workerNodeKey
].dequeueTask()
1991 private tasksQueueSize (workerNodeKey
: number): number {
1992 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1995 protected flushTasksQueue (workerNodeKey
: number): number {
1996 let flushedTasks
= 0
1997 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1998 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1999 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
2002 this.workerNodes
[workerNodeKey
].clearTasksQueue()
2006 private flushTasksQueues (): void {
2007 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
2008 this.flushTasksQueue(workerNodeKey
)