1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { type TransferListItem
} from
'node:worker_threads'
6 PromiseResponseWrapper
,
8 } from
'../utility-types'
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
20 updateMeasurementStatistics
22 import { KillBehaviors
} from
'../worker/worker-options'
23 import type { TaskFunction
} from
'../worker/task-functions'
32 type TasksQueueOptions
42 type MeasurementStatisticsRequirements
,
44 WorkerChoiceStrategies
,
45 type WorkerChoiceStrategy
,
46 type WorkerChoiceStrategyOptions
47 } from
'./selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
49 import { version
} from
'./version'
50 import { WorkerNode
} from
'./worker-node'
53 checkValidTasksQueueOptions
,
54 checkValidWorkerChoiceStrategy
58 * Base class that implements some shared logic for all poolifier pools.
60 * @typeParam Worker - Type of worker which manages this pool.
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
64 export abstract class AbstractPool
<
65 Worker
extends IWorker
,
68 > implements IPool
<Worker
, Data
, Response
> {
70 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
73 public readonly emitter
?: PoolEmitter
76 * The task execution response promise map:
77 * - `key`: The message id of each submitted task.
78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
82 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
83 new Map
<string, PromiseResponseWrapper
<Response
>>()
86 * Worker choice strategy context referencing a worker choice algorithm implementation.
88 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
95 * Dynamic pool maximum size property placeholder.
97 protected readonly max
?: number
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
104 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
107 * Whether the pool is started or not.
109 private started
: boolean
111 * Whether the pool is starting or not.
113 private starting
: boolean
115 * The start timestamp of the pool.
117 private readonly startTimestamp
120 * Constructs a new poolifier pool.
122 * @param numberOfWorkers - Number of workers that this pool should manage.
123 * @param filePath - Path to the worker file.
124 * @param opts - Options for the pool.
127 protected readonly numberOfWorkers
: number,
128 protected readonly filePath
: string,
129 protected readonly opts
: PoolOptions
<Worker
>
131 if (!this.isMain()) {
133 'Cannot start a pool from a worker with the same type as the pool'
136 this.checkNumberOfWorkers(this.numberOfWorkers
)
137 checkFilePath(this.filePath
)
138 this.checkPoolOptions(this.opts
)
140 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
141 this.executeTask
= this.executeTask
.bind(this)
142 this.enqueueTask
= this.enqueueTask
.bind(this)
144 if (this.opts
.enableEvents
=== true) {
145 this.emitter
= new PoolEmitter()
147 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
153 this.opts
.workerChoiceStrategy
,
154 this.opts
.workerChoiceStrategyOptions
159 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
162 this.starting
= false
163 if (this.opts
.startWorkers
=== true) {
167 this.startTimestamp
= performance
.now()
170 private checkNumberOfWorkers (numberOfWorkers
: number): void {
171 if (numberOfWorkers
== null) {
173 'Cannot instantiate a pool without specifying the number of workers'
175 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
177 'Cannot instantiate a pool with a non safe integer number of workers'
179 } else if (numberOfWorkers
< 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
183 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
188 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
189 if (isPlainObject(opts
)) {
190 this.opts
.startWorkers
= opts
.startWorkers
?? true
191 checkValidWorkerChoiceStrategy(
192 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
194 this.opts
.workerChoiceStrategy
=
195 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
199 this.opts
.workerChoiceStrategyOptions
= {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
201 ...opts
.workerChoiceStrategyOptions
203 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
204 this.opts
.enableEvents
= opts
.enableEvents
?? true
205 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
206 if (this.opts
.enableTasksQueue
) {
207 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
208 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
209 opts
.tasksQueueOptions
as TasksQueueOptions
213 throw new TypeError('Invalid pool options: must be a plain object')
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
221 workerChoiceStrategyOptions
!= null &&
222 !isPlainObject(workerChoiceStrategyOptions
)
225 'Invalid worker choice strategy options: must be a plain object'
229 workerChoiceStrategyOptions
?.retries
!= null &&
230 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
233 'Invalid worker choice strategy options: retries must be an integer'
237 workerChoiceStrategyOptions
?.retries
!= null &&
238 workerChoiceStrategyOptions
.retries
< 0
240 throw new RangeError(
241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
245 workerChoiceStrategyOptions
?.weights
!= null &&
246 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
249 'Invalid worker choice strategy options: must have a weight for each worker node'
253 workerChoiceStrategyOptions
?.measurement
!= null &&
254 !Object.values(Measurements
).includes(
255 workerChoiceStrategyOptions
.measurement
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
265 public get
info (): PoolInfo
{
270 started
: this.started
,
272 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
273 minSize
: this.minSize
,
274 maxSize
: this.maxSize
,
275 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
276 .runTime
.aggregate
&&
277 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
278 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
279 workerNodes
: this.workerNodes
.length
,
280 idleWorkerNodes
: this.workerNodes
.reduce(
281 (accumulator
, workerNode
) =>
282 workerNode
.usage
.tasks
.executing
=== 0
287 busyWorkerNodes
: this.workerNodes
.reduce(
288 (accumulator
, workerNode
) =>
289 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
292 executedTasks
: this.workerNodes
.reduce(
293 (accumulator
, workerNode
) =>
294 accumulator
+ workerNode
.usage
.tasks
.executed
,
297 executingTasks
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 accumulator
+ workerNode
.usage
.tasks
.executing
,
302 ...(this.opts
.enableTasksQueue
=== true && {
303 queuedTasks
: this.workerNodes
.reduce(
304 (accumulator
, workerNode
) =>
305 accumulator
+ workerNode
.usage
.tasks
.queued
,
309 ...(this.opts
.enableTasksQueue
=== true && {
310 maxQueuedTasks
: this.workerNodes
.reduce(
311 (accumulator
, workerNode
) =>
312 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
316 ...(this.opts
.enableTasksQueue
=== true && {
317 backPressure
: this.hasBackPressure()
319 ...(this.opts
.enableTasksQueue
=== true && {
320 stolenTasks
: this.workerNodes
.reduce(
321 (accumulator
, workerNode
) =>
322 accumulator
+ workerNode
.usage
.tasks
.stolen
,
326 failedTasks
: this.workerNodes
.reduce(
327 (accumulator
, workerNode
) =>
328 accumulator
+ workerNode
.usage
.tasks
.failed
,
331 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
332 .runTime
.aggregate
&& {
336 ...this.workerNodes
.map(
337 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
343 ...this.workerNodes
.map(
344 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
348 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
349 .runTime
.average
&& {
352 this.workerNodes
.reduce
<number[]>(
353 (accumulator
, workerNode
) =>
354 accumulator
.concat(workerNode
.usage
.runTime
.history
),
360 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
364 this.workerNodes
.reduce
<number[]>(
365 (accumulator
, workerNode
) =>
366 accumulator
.concat(workerNode
.usage
.runTime
.history
),
374 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
375 .waitTime
.aggregate
&& {
379 ...this.workerNodes
.map(
380 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
386 ...this.workerNodes
.map(
387 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
391 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
392 .waitTime
.average
&& {
395 this.workerNodes
.reduce
<number[]>(
396 (accumulator
, workerNode
) =>
397 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
403 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
404 .waitTime
.median
&& {
407 this.workerNodes
.reduce
<number[]>(
408 (accumulator
, workerNode
) =>
409 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
421 * The pool readiness boolean status.
423 private get
ready (): boolean {
425 this.workerNodes
.reduce(
426 (accumulator
, workerNode
) =>
427 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
436 * The approximate pool utilization.
438 * @returns The pool utilization.
440 private get
utilization (): number {
441 const poolTimeCapacity
=
442 (performance
.now() - this.startTimestamp
) * this.maxSize
443 const totalTasksRunTime
= this.workerNodes
.reduce(
444 (accumulator
, workerNode
) =>
445 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
448 const totalTasksWaitTime
= this.workerNodes
.reduce(
449 (accumulator
, workerNode
) =>
450 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
453 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
459 * If it is `'dynamic'`, it provides the `max` property.
461 protected abstract get
type (): PoolType
466 protected abstract get
worker (): WorkerType
469 * The pool minimum size.
471 protected get
minSize (): number {
472 return this.numberOfWorkers
476 * The pool maximum size.
478 protected get
maxSize (): number {
479 return this.max
?? this.numberOfWorkers
483 * Checks if the worker id sent in the received message from a worker is valid.
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
488 private checkMessageWorkerId (message
: MessageValue
<Response
>): void {
489 if (message
.workerId
== null) {
490 throw new Error('Worker message received without worker id')
492 message
.workerId
!= null &&
493 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
496 `Worker message received from unknown worker '${message.workerId}'`
502 * Gets the given worker its worker node key.
504 * @param worker - The worker.
505 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
507 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
508 return this.workerNodes
.findIndex(
509 workerNode
=> workerNode
.worker
=== worker
514 * Gets the worker node key given its worker id.
516 * @param workerId - The worker id.
517 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
519 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
520 return this.workerNodes
.findIndex(
521 workerNode
=> workerNode
.info
.id
=== workerId
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy
: WorkerChoiceStrategy
,
528 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
530 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
531 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
532 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
533 this.opts
.workerChoiceStrategy
535 if (workerChoiceStrategyOptions
!= null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
538 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
539 workerNode
.resetUsage()
540 this.sendStatisticsMessageToWorker(workerNodeKey
)
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
549 this.opts
.workerChoiceStrategyOptions
= {
550 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
551 ...workerChoiceStrategyOptions
553 this.workerChoiceStrategyContext
.setOptions(
554 this.opts
.workerChoiceStrategyOptions
559 public enableTasksQueue (
561 tasksQueueOptions
?: TasksQueueOptions
563 if (this.opts
.enableTasksQueue
=== true && !enable
) {
564 this.unsetTaskStealing()
565 this.unsetTasksStealingOnBackPressure()
566 this.flushTasksQueues()
568 this.opts
.enableTasksQueue
= enable
569 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
573 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
574 if (this.opts
.enableTasksQueue
=== true) {
575 checkValidTasksQueueOptions(tasksQueueOptions
)
576 this.opts
.tasksQueueOptions
=
577 this.buildTasksQueueOptions(tasksQueueOptions
)
578 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
579 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
580 this.setTaskStealing()
582 this.unsetTaskStealing()
584 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
585 this.setTasksStealingOnBackPressure()
587 this.unsetTasksStealingOnBackPressure()
589 } else if (this.opts
.tasksQueueOptions
!= null) {
590 delete this.opts
.tasksQueueOptions
594 private buildTasksQueueOptions (
595 tasksQueueOptions
: TasksQueueOptions
596 ): TasksQueueOptions
{
599 size
: Math.pow(this.maxSize
, 2),
602 tasksStealingOnBackPressure
: true
608 private setTasksQueueSize (size
: number): void {
609 for (const workerNode
of this.workerNodes
) {
610 workerNode
.tasksQueueBackPressureSize
= size
614 private setTaskStealing (): void {
615 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
616 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
617 this.taskStealingOnEmptyQueue
.bind(this)
621 private unsetTaskStealing (): void {
622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
623 delete this.workerNodes
[workerNodeKey
].onEmptyQueue
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
629 this.workerNodes
[workerNodeKey
].onBackPressure
=
630 this.tasksStealingOnBackPressure
.bind(this)
634 private unsetTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
636 delete this.workerNodes
[workerNodeKey
].onBackPressure
641 * Whether the pool is full or not.
643 * The pool filling boolean status.
645 protected get
full (): boolean {
646 return this.workerNodes
.length
>= this.maxSize
650 * Whether the pool is busy or not.
652 * The pool busyness boolean status.
654 protected abstract get
busy (): boolean
657 * Whether worker nodes are executing concurrently their tasks quota or not.
659 * @returns Worker nodes busyness boolean status.
661 protected internalBusy (): boolean {
662 if (this.opts
.enableTasksQueue
=== true) {
664 this.workerNodes
.findIndex(
666 workerNode
.info
.ready
&&
667 workerNode
.usage
.tasks
.executing
<
668 (this.opts
.tasksQueueOptions
?.concurrency
as number)
673 this.workerNodes
.findIndex(
675 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
680 private async sendTaskFunctionOperationToWorker (
681 workerNodeKey
: number,
682 message
: MessageValue
<Data
>
683 ): Promise
<boolean> {
684 return await new Promise
<boolean>((resolve
, reject
) => {
685 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
686 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
688 message
.workerId
=== workerId
&&
689 message
.taskFunctionOperationStatus
=== true
693 message
.workerId
=== workerId
&&
694 message
.taskFunctionOperationStatus
=== false
698 `Task function operation '${
699 message.taskFunctionOperation as string
700 }' failed on worker ${message.workerId} with error: '${
701 message.workerError?.message as string
707 this.sendToWorker(workerNodeKey
, message
)
711 private async sendTaskFunctionOperationToWorkers (
712 message
: MessageValue
<Data
>
713 ): Promise
<boolean> {
714 return await new Promise
<boolean>((resolve
, reject
) => {
715 const responsesReceived
= new Array<MessageValue
<Data
| Response
>>()
716 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
717 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
718 if (message
.taskFunctionOperationStatus
!= null) {
719 responsesReceived
.push(message
)
721 responsesReceived
.length
=== this.workerNodes
.length
&&
722 responsesReceived
.every(
723 message
=> message
.taskFunctionOperationStatus
=== true
728 responsesReceived
.length
=== this.workerNodes
.length
&&
729 responsesReceived
.some(
730 message
=> message
.taskFunctionOperationStatus
=== false
733 const errorResponse
= responsesReceived
.find(
734 response
=> response
.taskFunctionOperationStatus
=== false
738 `Task function operation '${
739 message.taskFunctionOperation as string
740 }' failed on worker ${
741 errorResponse?.workerId as number
743 errorResponse?.workerError?.message as string
750 this.sendToWorker(workerNodeKey
, message
)
756 public hasTaskFunction (name
: string): boolean {
757 for (const workerNode
of this.workerNodes
) {
759 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
760 workerNode
.info
.taskFunctionNames
.includes(name
)
769 public async addTaskFunction (
771 fn
: TaskFunction
<Data
, Response
>
772 ): Promise
<boolean> {
773 if (typeof name
!== 'string') {
774 throw new TypeError('name argument must be a string')
776 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
777 throw new TypeError('name argument must not be an empty string')
779 if (typeof fn
!== 'function') {
780 throw new TypeError('fn argument must be a function')
782 const opResult
= await this.sendTaskFunctionOperationToWorkers({
783 taskFunctionOperation
: 'add',
784 taskFunctionName
: name
,
785 taskFunction
: fn
.toString()
787 this.taskFunctions
.set(name
, fn
)
792 public async removeTaskFunction (name
: string): Promise
<boolean> {
793 if (!this.taskFunctions
.has(name
)) {
795 'Cannot remove a task function not handled on the pool side'
798 const opResult
= await this.sendTaskFunctionOperationToWorkers({
799 taskFunctionOperation
: 'remove',
800 taskFunctionName
: name
802 this.deleteTaskFunctionWorkerUsages(name
)
803 this.taskFunctions
.delete(name
)
808 public listTaskFunctionNames (): string[] {
809 for (const workerNode
of this.workerNodes
) {
811 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
812 workerNode
.info
.taskFunctionNames
.length
> 0
814 return workerNode
.info
.taskFunctionNames
821 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
822 return await this.sendTaskFunctionOperationToWorkers({
823 taskFunctionOperation
: 'default',
824 taskFunctionName
: name
828 private deleteTaskFunctionWorkerUsages (name
: string): void {
829 for (const workerNode
of this.workerNodes
) {
830 workerNode
.deleteTaskFunctionWorkerUsage(name
)
834 private shallExecuteTask (workerNodeKey
: number): boolean {
836 this.tasksQueueSize(workerNodeKey
) === 0 &&
837 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
838 (this.opts
.tasksQueueOptions
?.concurrency
as number)
843 public async execute (
846 transferList
?: TransferListItem
[]
847 ): Promise
<Response
> {
848 return await new Promise
<Response
>((resolve
, reject
) => {
850 reject(new Error('Cannot execute a task on not started pool'))
853 if (name
!= null && typeof name
!== 'string') {
854 reject(new TypeError('name argument must be a string'))
859 typeof name
=== 'string' &&
860 name
.trim().length
=== 0
862 reject(new TypeError('name argument must not be an empty string'))
865 if (transferList
!= null && !Array.isArray(transferList
)) {
866 reject(new TypeError('transferList argument must be an array'))
869 const timestamp
= performance
.now()
870 const workerNodeKey
= this.chooseWorkerNode()
871 const task
: Task
<Data
> = {
872 name
: name
?? DEFAULT_TASK_NAME
,
873 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
874 data
: data
?? ({} as Data
),
879 this.promiseResponseMap
.set(task
.taskId
as string, {
885 this.opts
.enableTasksQueue
=== false ||
886 (this.opts
.enableTasksQueue
=== true &&
887 this.shallExecuteTask(workerNodeKey
))
889 this.executeTask(workerNodeKey
, task
)
891 this.enqueueTask(workerNodeKey
, task
)
897 public start (): void {
900 this.workerNodes
.reduce(
901 (accumulator
, workerNode
) =>
902 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
904 ) < this.numberOfWorkers
906 this.createAndSetupWorkerNode()
908 this.starting
= false
913 public async destroy (): Promise
<void> {
915 this.workerNodes
.map(async (_
, workerNodeKey
) => {
916 await this.destroyWorkerNode(workerNodeKey
)
919 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
923 protected async sendKillMessageToWorker (
924 workerNodeKey
: number
926 await new Promise
<void>((resolve
, reject
) => {
927 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
928 if (message
.kill
=== 'success') {
930 } else if (message
.kill
=== 'failure') {
934 message.workerId as number
935 } kill message handling failed`
940 this.sendToWorker(workerNodeKey
, { kill
: true })
945 * Terminates the worker node given its worker node key.
947 * @param workerNodeKey - The worker node key.
949 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
952 * Setup hook to execute code before worker nodes are created in the abstract constructor.
957 protected setupHook (): void {
958 /* Intentionally empty */
962 * Should return whether the worker is the main worker or not.
964 protected abstract isMain (): boolean
967 * Hook executed before the worker task execution.
970 * @param workerNodeKey - The worker node key.
971 * @param task - The task to execute.
973 protected beforeTaskExecutionHook (
974 workerNodeKey
: number,
977 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
978 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
979 ++workerUsage
.tasks
.executing
980 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
983 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
984 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
988 const taskFunctionWorkerUsage
= this.workerNodes
[
990 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
991 ++taskFunctionWorkerUsage
.tasks
.executing
992 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
997 * Hook executed after the worker task execution.
1000 * @param workerNodeKey - The worker node key.
1001 * @param message - The received message.
1003 protected afterTaskExecutionHook (
1004 workerNodeKey
: number,
1005 message
: MessageValue
<Response
>
1007 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1008 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1009 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1010 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1011 this.updateEluWorkerUsage(workerUsage
, message
)
1014 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1015 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1016 message
.taskPerformance
?.name
as string
1019 const taskFunctionWorkerUsage
= this.workerNodes
[
1021 ].getTaskFunctionWorkerUsage(
1022 message
.taskPerformance
?.name
as string
1024 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1025 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1026 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1031 * Whether the worker node shall update its task function worker usage or not.
1033 * @param workerNodeKey - The worker node key.
1034 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1036 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1037 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1039 workerInfo
!= null &&
1040 Array.isArray(workerInfo
.taskFunctionNames
) &&
1041 workerInfo
.taskFunctionNames
.length
> 2
1045 private updateTaskStatisticsWorkerUsage (
1046 workerUsage
: WorkerUsage
,
1047 message
: MessageValue
<Response
>
1049 const workerTaskStatistics
= workerUsage
.tasks
1051 workerTaskStatistics
.executing
!= null &&
1052 workerTaskStatistics
.executing
> 0
1054 --workerTaskStatistics
.executing
1056 if (message
.workerError
== null) {
1057 ++workerTaskStatistics
.executed
1059 ++workerTaskStatistics
.failed
1063 private updateRunTimeWorkerUsage (
1064 workerUsage
: WorkerUsage
,
1065 message
: MessageValue
<Response
>
1067 if (message
.workerError
!= null) {
1070 updateMeasurementStatistics(
1071 workerUsage
.runTime
,
1072 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1073 message
.taskPerformance
?.runTime
?? 0
1077 private updateWaitTimeWorkerUsage (
1078 workerUsage
: WorkerUsage
,
1081 const timestamp
= performance
.now()
1082 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1083 updateMeasurementStatistics(
1084 workerUsage
.waitTime
,
1085 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1090 private updateEluWorkerUsage (
1091 workerUsage
: WorkerUsage
,
1092 message
: MessageValue
<Response
>
1094 if (message
.workerError
!= null) {
1097 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1098 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1099 updateMeasurementStatistics(
1100 workerUsage
.elu
.active
,
1101 eluTaskStatisticsRequirements
,
1102 message
.taskPerformance
?.elu
?.active
?? 0
1104 updateMeasurementStatistics(
1105 workerUsage
.elu
.idle
,
1106 eluTaskStatisticsRequirements
,
1107 message
.taskPerformance
?.elu
?.idle
?? 0
1109 if (eluTaskStatisticsRequirements
.aggregate
) {
1110 if (message
.taskPerformance
?.elu
!= null) {
1111 if (workerUsage
.elu
.utilization
!= null) {
1112 workerUsage
.elu
.utilization
=
1113 (workerUsage
.elu
.utilization
+
1114 message
.taskPerformance
.elu
.utilization
) /
1117 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1124 * Chooses a worker node for the next task.
1126 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1128 * @returns The chosen worker node key
1130 private chooseWorkerNode (): number {
1131 if (this.shallCreateDynamicWorker()) {
1132 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1134 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1136 return workerNodeKey
1139 return this.workerChoiceStrategyContext
.execute()
1143 * Conditions for dynamic worker creation.
1145 * @returns Whether to create a dynamic worker or not.
1147 private shallCreateDynamicWorker (): boolean {
1148 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1152 * Sends a message to worker given its worker node key.
1154 * @param workerNodeKey - The worker node key.
1155 * @param message - The message.
1156 * @param transferList - The optional array of transferable objects.
1158 protected abstract sendToWorker (
1159 workerNodeKey
: number,
1160 message
: MessageValue
<Data
>,
1161 transferList
?: TransferListItem
[]
1165 * Creates a new worker.
1167 * @returns Newly created worker.
1169 protected abstract createWorker (): Worker
1172 * Creates a new, completely set up worker node.
1174 * @returns New, completely set up worker node key.
1176 protected createAndSetupWorkerNode (): number {
1177 const worker
= this.createWorker()
1179 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1180 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1181 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1182 worker
.on('error', error
=> {
1183 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1184 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1185 workerInfo
.ready
= false
1186 this.workerNodes
[workerNodeKey
].closeChannel()
1187 this.emitter
?.emit(PoolEvents
.error
, error
)
1191 this.opts
.restartWorkerOnError
=== true
1193 if (workerInfo
.dynamic
) {
1194 this.createAndSetupDynamicWorkerNode()
1196 this.createAndSetupWorkerNode()
1199 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1200 this.redistributeQueuedTasks(workerNodeKey
)
1203 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1204 worker
.once('exit', () => {
1205 this.removeWorkerNode(worker
)
1208 const workerNodeKey
= this.addWorkerNode(worker
)
1210 this.afterWorkerNodeSetup(workerNodeKey
)
1212 return workerNodeKey
1216 * Creates a new, completely set up dynamic worker node.
1218 * @returns New, completely set up dynamic worker node key.
1220 protected createAndSetupDynamicWorkerNode (): number {
1221 const workerNodeKey
= this.createAndSetupWorkerNode()
1222 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1223 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1226 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1227 // Kill message received from worker
1229 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1230 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1231 ((this.opts
.enableTasksQueue
=== false &&
1232 workerUsage
.tasks
.executing
=== 0) ||
1233 (this.opts
.enableTasksQueue
=== true &&
1234 workerUsage
.tasks
.executing
=== 0 &&
1235 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1237 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1238 this.emitter
?.emit(PoolEvents
.error
, error
)
1242 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1243 this.sendToWorker(workerNodeKey
, {
1246 if (this.taskFunctions
.size
> 0) {
1247 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1248 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1249 taskFunctionOperation
: 'add',
1251 taskFunction
: taskFunction
.toString()
1253 this.emitter
?.emit(PoolEvents
.error
, error
)
1257 workerInfo
.dynamic
= true
1259 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1260 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1262 workerInfo
.ready
= true
1264 this.checkAndEmitDynamicWorkerCreationEvents()
1265 return workerNodeKey
1269 * Registers a listener callback on the worker given its worker node key.
1271 * @param workerNodeKey - The worker node key.
1272 * @param listener - The message listener callback.
1274 protected abstract registerWorkerMessageListener
<
1275 Message
extends Data
| Response
1277 workerNodeKey
: number,
1278 listener
: (message
: MessageValue
<Message
>) => void
1282 * Method hooked up after a worker node has been newly created.
1283 * Can be overridden.
1285 * @param workerNodeKey - The newly created worker node key.
1287 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1288 // Listen to worker messages.
1289 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1290 // Send the startup message to worker.
1291 this.sendStartupMessageToWorker(workerNodeKey
)
1292 // Send the statistics message to worker.
1293 this.sendStatisticsMessageToWorker(workerNodeKey
)
1294 if (this.opts
.enableTasksQueue
=== true) {
1295 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1296 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1297 this.taskStealingOnEmptyQueue
.bind(this)
1299 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1300 this.workerNodes
[workerNodeKey
].onBackPressure
=
1301 this.tasksStealingOnBackPressure
.bind(this)
1307 * Sends the startup message to worker given its worker node key.
1309 * @param workerNodeKey - The worker node key.
1311 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1314 * Sends the statistics message to worker given its worker node key.
1316 * @param workerNodeKey - The worker node key.
1318 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1319 this.sendToWorker(workerNodeKey
, {
1322 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1324 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1330 private redistributeQueuedTasks (workerNodeKey
: number): void {
1331 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1332 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1333 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1334 return workerNode
.info
.ready
&&
1335 workerNode
.usage
.tasks
.queued
<
1336 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1342 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1343 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1344 this.executeTask(destinationWorkerNodeKey
, task
)
1346 this.enqueueTask(destinationWorkerNodeKey
, task
)
1351 private updateTaskStolenStatisticsWorkerUsage (
1352 workerNodeKey
: number,
1355 const workerNode
= this.workerNodes
[workerNodeKey
]
1356 if (workerNode
?.usage
!= null) {
1357 ++workerNode
.usage
.tasks
.stolen
1360 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1361 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1363 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1366 ++taskFunctionWorkerUsage
.tasks
.stolen
1370 private taskStealingOnEmptyQueue (workerId
: number): void {
1371 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1372 const workerNodes
= this.workerNodes
1375 (workerNodeA
, workerNodeB
) =>
1376 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1378 const sourceWorkerNode
= workerNodes
.find(
1380 workerNode
.info
.ready
&&
1381 workerNode
.info
.id
!== workerId
&&
1382 workerNode
.usage
.tasks
.queued
> 0
1384 if (sourceWorkerNode
!= null) {
1385 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1386 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1387 this.executeTask(destinationWorkerNodeKey
, task
)
1389 this.enqueueTask(destinationWorkerNodeKey
, task
)
1391 this.updateTaskStolenStatisticsWorkerUsage(
1392 destinationWorkerNodeKey
,
1398 private tasksStealingOnBackPressure (workerId
: number): void {
1399 const sizeOffset
= 1
1400 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1403 const sourceWorkerNode
=
1404 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1405 const workerNodes
= this.workerNodes
1408 (workerNodeA
, workerNodeB
) =>
1409 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1411 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1413 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1414 workerNode
.info
.ready
&&
1415 workerNode
.info
.id
!== workerId
&&
1416 workerNode
.usage
.tasks
.queued
<
1417 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1419 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1420 if (this.shallExecuteTask(workerNodeKey
)) {
1421 this.executeTask(workerNodeKey
, task
)
1423 this.enqueueTask(workerNodeKey
, task
)
1425 this.updateTaskStolenStatisticsWorkerUsage(
1434 * This method is the listener registered for each worker message.
1436 * @returns The listener function to execute when a message is received from a worker.
1438 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1440 this.checkMessageWorkerId(message
)
1441 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1442 // Worker ready response received from worker
1443 this.handleWorkerReadyResponse(message
)
1444 } else if (message
.taskId
!= null) {
1445 // Task execution response received from worker
1446 this.handleTaskExecutionResponse(message
)
1447 } else if (message
.taskFunctionNames
!= null) {
1448 // Task function names message received from worker
1450 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1451 ).taskFunctionNames
= message
.taskFunctionNames
1456 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1457 if (message
.ready
=== false) {
1459 `Worker ${message.workerId as number} failed to initialize`
1462 const workerInfo
= this.getWorkerInfo(
1463 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1465 workerInfo
.ready
= message
.ready
as boolean
1466 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1468 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1472 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1473 const { taskId
, workerError
, data
} = message
1474 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1475 if (promiseResponse
!= null) {
1476 if (workerError
!= null) {
1477 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1478 promiseResponse
.reject(workerError
.message
)
1480 promiseResponse
.resolve(data
as Response
)
1482 const workerNodeKey
= promiseResponse
.workerNodeKey
1483 this.afterTaskExecutionHook(workerNodeKey
, message
)
1484 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1485 this.promiseResponseMap
.delete(taskId
as string)
1487 this.opts
.enableTasksQueue
=== true &&
1488 this.tasksQueueSize(workerNodeKey
) > 0 &&
1489 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1490 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1494 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1500 private checkAndEmitTaskExecutionEvents (): void {
1502 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1506 private checkAndEmitTaskQueuingEvents (): void {
1507 if (this.hasBackPressure()) {
1508 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1512 private checkAndEmitDynamicWorkerCreationEvents (): void {
1513 if (this.type === PoolTypes
.dynamic
) {
1515 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1521 * Gets the worker information given its worker node key.
1523 * @param workerNodeKey - The worker node key.
1524 * @returns The worker information.
1526 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1527 return this.workerNodes
[workerNodeKey
].info
1531 * Adds the given worker in the pool worker nodes.
1533 * @param worker - The worker.
1534 * @returns The added worker node key.
1535 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1537 private addWorkerNode (worker
: Worker
): number {
1538 const workerNode
= new WorkerNode
<Worker
, Data
>(
1540 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1542 // Flag the worker node as ready at pool startup.
1543 if (this.starting
) {
1544 workerNode
.info
.ready
= true
1546 this.workerNodes
.push(workerNode
)
1547 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1548 if (workerNodeKey
=== -1) {
1549 throw new Error('Worker added not found in worker nodes')
1551 return workerNodeKey
1555 * Removes the given worker from the pool worker nodes.
1557 * @param worker - The worker.
1559 private removeWorkerNode (worker
: Worker
): void {
1560 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1561 if (workerNodeKey
!== -1) {
1562 this.workerNodes
.splice(workerNodeKey
, 1)
1563 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1568 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1570 this.opts
.enableTasksQueue
=== true &&
1571 this.workerNodes
[workerNodeKey
].hasBackPressure()
1575 private hasBackPressure (): boolean {
1577 this.opts
.enableTasksQueue
=== true &&
1578 this.workerNodes
.findIndex(
1579 workerNode
=> !workerNode
.hasBackPressure()
1585 * Executes the given task on the worker given its worker node key.
1587 * @param workerNodeKey - The worker node key.
1588 * @param task - The task to execute.
1590 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1591 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1592 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1593 this.checkAndEmitTaskExecutionEvents()
1596 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1597 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1598 this.checkAndEmitTaskQueuingEvents()
1599 return tasksQueueSize
1602 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1603 return this.workerNodes
[workerNodeKey
].dequeueTask()
1606 private tasksQueueSize (workerNodeKey
: number): number {
1607 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1610 protected flushTasksQueue (workerNodeKey
: number): void {
1611 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1614 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1617 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1620 private flushTasksQueues (): void {
1621 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1622 this.flushTasksQueue(workerNodeKey
)