1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { existsSync
} from
'node:fs'
4 import { type TransferListItem
} from
'node:worker_threads'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
21 updateMeasurementStatistics
23 import { KillBehaviors
} from
'../worker/worker-options'
24 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
43 type MeasurementStatisticsRequirements
,
45 WorkerChoiceStrategies
,
46 type WorkerChoiceStrategy
,
47 type WorkerChoiceStrategyOptions
48 } from
'./selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
50 import { version
} from
'./version'
51 import { WorkerNode
} from
'./worker-node'
54 * Base class that implements some shared logic for all poolifier pools.
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
60 export abstract class AbstractPool
<
61 Worker
extends IWorker
,
64 > implements IPool
<Worker
, Data
, Response
> {
66 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
69 public readonly emitter
?: PoolEmitter
72 * The task execution response promise map.
74 * - `key`: The message id of each submitted task.
75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
79 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
80 new Map
<string, PromiseResponseWrapper
<Response
>>()
83 * Worker choice strategy context referencing a worker choice algorithm implementation.
85 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
92 * Dynamic pool maximum size property placeholder.
94 protected readonly max
?: number
97 * The task functions added at runtime map:
98 * - `key`: The task function name.
99 * - `value`: The task function itself.
101 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
104 * Whether the pool is starting or not.
106 private readonly starting
: boolean
108 * Whether the pool is started or not.
110 private started
: boolean
112 * The start timestamp of the pool.
114 private readonly startTimestamp
117 * Constructs a new poolifier pool.
119 * @param numberOfWorkers - Number of workers that this pool should manage.
120 * @param filePath - Path to the worker file.
121 * @param opts - Options for the pool.
124 protected readonly numberOfWorkers
: number,
125 protected readonly filePath
: string,
126 protected readonly opts
: PoolOptions
<Worker
>
128 if (!this.isMain()) {
130 'Cannot start a pool from a worker with the same type as the pool'
133 this.checkNumberOfWorkers(this.numberOfWorkers
)
134 this.checkFilePath(this.filePath
)
135 this.checkPoolOptions(this.opts
)
137 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
138 this.executeTask
= this.executeTask
.bind(this)
139 this.enqueueTask
= this.enqueueTask
.bind(this)
141 if (this.opts
.enableEvents
=== true) {
142 this.emitter
= new PoolEmitter()
144 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
150 this.opts
.workerChoiceStrategy
,
151 this.opts
.workerChoiceStrategyOptions
156 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
160 this.starting
= false
163 this.startTimestamp
= performance
.now()
166 private checkFilePath (filePath
: string): void {
169 typeof filePath
!== 'string' ||
170 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
172 throw new Error('Please specify a file with a worker implementation')
174 if (!existsSync(filePath
)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
179 private checkNumberOfWorkers (numberOfWorkers
: number): void {
180 if (numberOfWorkers
== null) {
182 'Cannot instantiate a pool without specifying the number of workers'
184 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
186 'Cannot instantiate a pool with a non safe integer number of workers'
188 } else if (numberOfWorkers
< 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
192 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
197 protected checkDynamicPoolSize (min
: number, max
: number): void {
198 if (this.type === PoolTypes
.dynamic
) {
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
203 } else if (!Number.isSafeInteger(max
)) {
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
207 } else if (min
> max
) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
211 } else if (max
=== 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
215 } else if (min
=== max
) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
223 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
224 if (isPlainObject(opts
)) {
225 this.opts
.workerChoiceStrategy
=
226 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
227 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
228 this.opts
.workerChoiceStrategyOptions
= {
229 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
230 ...opts
.workerChoiceStrategyOptions
232 this.checkValidWorkerChoiceStrategyOptions(
233 this.opts
.workerChoiceStrategyOptions
235 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
236 this.opts
.enableEvents
= opts
.enableEvents
?? true
237 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
238 if (this.opts
.enableTasksQueue
) {
239 this.checkValidTasksQueueOptions(
240 opts
.tasksQueueOptions
as TasksQueueOptions
242 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
243 opts
.tasksQueueOptions
as TasksQueueOptions
247 throw new TypeError('Invalid pool options: must be a plain object')
251 private checkValidWorkerChoiceStrategy (
252 workerChoiceStrategy
: WorkerChoiceStrategy
254 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
256 `Invalid worker choice strategy '${workerChoiceStrategy}'`
261 private checkValidWorkerChoiceStrategyOptions (
262 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
264 if (!isPlainObject(workerChoiceStrategyOptions
)) {
266 'Invalid worker choice strategy options: must be a plain object'
270 workerChoiceStrategyOptions
.retries
!= null &&
271 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
274 'Invalid worker choice strategy options: retries must be an integer'
278 workerChoiceStrategyOptions
.retries
!= null &&
279 workerChoiceStrategyOptions
.retries
< 0
281 throw new RangeError(
282 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
286 workerChoiceStrategyOptions
.weights
!= null &&
287 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
290 'Invalid worker choice strategy options: must have a weight for each worker node'
294 workerChoiceStrategyOptions
.measurement
!= null &&
295 !Object.values(Measurements
).includes(
296 workerChoiceStrategyOptions
.measurement
300 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
305 private checkValidTasksQueueOptions (
306 tasksQueueOptions
: TasksQueueOptions
308 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
309 throw new TypeError('Invalid tasks queue options: must be a plain object')
312 tasksQueueOptions
?.concurrency
!= null &&
313 !Number.isSafeInteger(tasksQueueOptions
?.concurrency
)
316 'Invalid worker node tasks concurrency: must be an integer'
320 tasksQueueOptions
?.concurrency
!= null &&
321 tasksQueueOptions
?.concurrency
<= 0
323 throw new RangeError(
324 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
327 if (tasksQueueOptions
?.queueMaxSize
!= null) {
329 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
333 tasksQueueOptions
?.size
!= null &&
334 !Number.isSafeInteger(tasksQueueOptions
?.size
)
337 'Invalid worker node tasks queue size: must be an integer'
340 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
?.size
<= 0) {
341 throw new RangeError(
342 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
347 private startPool (): void {
349 this.workerNodes
.reduce(
350 (accumulator
, workerNode
) =>
351 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
353 ) < this.numberOfWorkers
355 this.createAndSetupWorkerNode()
360 public get
info (): PoolInfo
{
366 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
367 minSize
: this.minSize
,
368 maxSize
: this.maxSize
,
369 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
370 .runTime
.aggregate
&&
371 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
372 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
373 workerNodes
: this.workerNodes
.length
,
374 idleWorkerNodes
: this.workerNodes
.reduce(
375 (accumulator
, workerNode
) =>
376 workerNode
.usage
.tasks
.executing
=== 0
381 busyWorkerNodes
: this.workerNodes
.reduce(
382 (accumulator
, workerNode
) =>
383 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
386 executedTasks
: this.workerNodes
.reduce(
387 (accumulator
, workerNode
) =>
388 accumulator
+ workerNode
.usage
.tasks
.executed
,
391 executingTasks
: this.workerNodes
.reduce(
392 (accumulator
, workerNode
) =>
393 accumulator
+ workerNode
.usage
.tasks
.executing
,
396 ...(this.opts
.enableTasksQueue
=== true && {
397 queuedTasks
: this.workerNodes
.reduce(
398 (accumulator
, workerNode
) =>
399 accumulator
+ workerNode
.usage
.tasks
.queued
,
403 ...(this.opts
.enableTasksQueue
=== true && {
404 maxQueuedTasks
: this.workerNodes
.reduce(
405 (accumulator
, workerNode
) =>
406 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
410 ...(this.opts
.enableTasksQueue
=== true && {
411 backPressure
: this.hasBackPressure()
413 ...(this.opts
.enableTasksQueue
=== true && {
414 stolenTasks
: this.workerNodes
.reduce(
415 (accumulator
, workerNode
) =>
416 accumulator
+ workerNode
.usage
.tasks
.stolen
,
420 failedTasks
: this.workerNodes
.reduce(
421 (accumulator
, workerNode
) =>
422 accumulator
+ workerNode
.usage
.tasks
.failed
,
425 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
426 .runTime
.aggregate
&& {
430 ...this.workerNodes
.map(
431 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
437 ...this.workerNodes
.map(
438 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
442 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
443 .runTime
.average
&& {
446 this.workerNodes
.reduce
<number[]>(
447 (accumulator
, workerNode
) =>
448 accumulator
.concat(workerNode
.usage
.runTime
.history
),
454 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
458 this.workerNodes
.reduce
<number[]>(
459 (accumulator
, workerNode
) =>
460 accumulator
.concat(workerNode
.usage
.runTime
.history
),
468 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
469 .waitTime
.aggregate
&& {
473 ...this.workerNodes
.map(
474 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
480 ...this.workerNodes
.map(
481 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
485 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
486 .waitTime
.average
&& {
489 this.workerNodes
.reduce
<number[]>(
490 (accumulator
, workerNode
) =>
491 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
497 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
498 .waitTime
.median
&& {
501 this.workerNodes
.reduce
<number[]>(
502 (accumulator
, workerNode
) =>
503 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
515 * The pool readiness boolean status.
517 private get
ready (): boolean {
519 this.workerNodes
.reduce(
520 (accumulator
, workerNode
) =>
521 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
530 * The approximate pool utilization.
532 * @returns The pool utilization.
534 private get
utilization (): number {
535 const poolTimeCapacity
=
536 (performance
.now() - this.startTimestamp
) * this.maxSize
537 const totalTasksRunTime
= this.workerNodes
.reduce(
538 (accumulator
, workerNode
) =>
539 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
542 const totalTasksWaitTime
= this.workerNodes
.reduce(
543 (accumulator
, workerNode
) =>
544 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
547 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
553 * If it is `'dynamic'`, it provides the `max` property.
555 protected abstract get
type (): PoolType
560 protected abstract get
worker (): WorkerType
563 * The pool minimum size.
565 protected get
minSize (): number {
566 return this.numberOfWorkers
570 * The pool maximum size.
572 protected get
maxSize (): number {
573 return this.max
?? this.numberOfWorkers
577 * Checks if the worker id sent in the received message from a worker is valid.
579 * @param message - The received message.
580 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
582 private checkMessageWorkerId (message
: MessageValue
<Response
>): void {
583 if (message
.workerId
== null) {
584 throw new Error('Worker message received without worker id')
586 message
.workerId
!= null &&
587 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
590 `Worker message received from unknown worker '${message.workerId}'`
596 * Gets the given worker its worker node key.
598 * @param worker - The worker.
599 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
601 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
602 return this.workerNodes
.findIndex(
603 workerNode
=> workerNode
.worker
=== worker
608 * Gets the worker node key given its worker id.
610 * @param workerId - The worker id.
611 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
613 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
614 return this.workerNodes
.findIndex(
615 workerNode
=> workerNode
.info
.id
=== workerId
620 public setWorkerChoiceStrategy (
621 workerChoiceStrategy
: WorkerChoiceStrategy
,
622 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
624 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
625 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
626 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
627 this.opts
.workerChoiceStrategy
629 if (workerChoiceStrategyOptions
!= null) {
630 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
632 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
633 workerNode
.resetUsage()
634 this.sendStatisticsMessageToWorker(workerNodeKey
)
639 public setWorkerChoiceStrategyOptions (
640 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
642 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
643 this.opts
.workerChoiceStrategyOptions
= {
644 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
645 ...workerChoiceStrategyOptions
647 this.workerChoiceStrategyContext
.setOptions(
648 this.opts
.workerChoiceStrategyOptions
653 public enableTasksQueue (
655 tasksQueueOptions
?: TasksQueueOptions
657 if (this.opts
.enableTasksQueue
=== true && !enable
) {
658 this.flushTasksQueues()
660 this.opts
.enableTasksQueue
= enable
661 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
665 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
666 if (this.opts
.enableTasksQueue
=== true) {
667 this.checkValidTasksQueueOptions(tasksQueueOptions
)
668 this.opts
.tasksQueueOptions
=
669 this.buildTasksQueueOptions(tasksQueueOptions
)
670 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
671 } else if (this.opts
.tasksQueueOptions
!= null) {
672 delete this.opts
.tasksQueueOptions
676 private setTasksQueueSize (size
: number): void {
677 for (const workerNode
of this.workerNodes
) {
678 workerNode
.tasksQueueBackPressureSize
= size
682 private buildTasksQueueOptions (
683 tasksQueueOptions
: TasksQueueOptions
684 ): TasksQueueOptions
{
687 size
: Math.pow(this.maxSize
, 2),
695 * Whether the pool is full or not.
697 * The pool filling boolean status.
699 protected get
full (): boolean {
700 return this.workerNodes
.length
>= this.maxSize
704 * Whether the pool is busy or not.
706 * The pool busyness boolean status.
708 protected abstract get
busy (): boolean
711 * Whether worker nodes are executing concurrently their tasks quota or not.
713 * @returns Worker nodes busyness boolean status.
715 protected internalBusy (): boolean {
716 if (this.opts
.enableTasksQueue
=== true) {
718 this.workerNodes
.findIndex(
720 workerNode
.info
.ready
&&
721 workerNode
.usage
.tasks
.executing
<
722 (this.opts
.tasksQueueOptions
?.concurrency
as number)
727 this.workerNodes
.findIndex(
729 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
735 private async sendTaskFunctionOperationToWorker (
736 workerNodeKey
: number,
737 message
: MessageValue
<Data
>
738 ): Promise
<boolean> {
739 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
740 return await new Promise
<boolean>((resolve
, reject
) => {
741 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
743 message
.workerId
=== workerId
&&
744 message
.taskFunctionOperationStatus
=== true
748 message
.workerId
=== workerId
&&
749 message
.taskFunctionOperationStatus
=== false
753 `Task function operation ${
754 message.taskFunctionOperation as string
755 } failed on worker ${message.workerId}`
760 this.sendToWorker(workerNodeKey
, message
)
764 private async sendTaskFunctionOperationToWorkers (
765 message
: Omit
<MessageValue
<Data
>, 'workerId'>
766 ): Promise
<boolean> {
767 return await new Promise
<boolean>((resolve
, reject
) => {
768 const responsesReceived
= new Array<MessageValue
<Data
| Response
>>()
769 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
770 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
771 if (message
.taskFunctionOperationStatus
!= null) {
772 responsesReceived
.push(message
)
774 responsesReceived
.length
=== this.workerNodes
.length
&&
775 responsesReceived
.every(
776 message
=> message
.taskFunctionOperationStatus
=== true
781 responsesReceived
.length
=== this.workerNodes
.length
&&
782 responsesReceived
.some(
783 message
=> message
.taskFunctionOperationStatus
=== false
788 `Task function operation ${
789 message.taskFunctionOperation as string
790 } failed on worker ${message.workerId as number}`
796 this.sendToWorker(workerNodeKey
, message
)
802 public hasTaskFunction (name
: string): boolean {
803 for (const workerNode
of this.workerNodes
) {
805 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
806 workerNode
.info
.taskFunctionNames
.includes(name
)
815 public async addTaskFunction (
817 taskFunction
: TaskFunction
<Data
, Response
>
818 ): Promise
<boolean> {
819 this.taskFunctions
.set(name
, taskFunction
)
820 return await this.sendTaskFunctionOperationToWorkers({
821 taskFunctionOperation
: 'add',
822 taskFunctionName
: name
,
823 taskFunction
: taskFunction
.toString()
828 public async removeTaskFunction (name
: string): Promise
<boolean> {
829 this.taskFunctions
.delete(name
)
830 return await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation
: 'remove',
832 taskFunctionName
: name
837 public listTaskFunctionNames (): string[] {
838 for (const workerNode
of this.workerNodes
) {
840 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
841 workerNode
.info
.taskFunctionNames
.length
> 0
843 return workerNode
.info
.taskFunctionNames
850 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
851 return await this.sendTaskFunctionOperationToWorkers({
852 taskFunctionOperation
: 'default',
853 taskFunctionName
: name
857 private shallExecuteTask (workerNodeKey
: number): boolean {
859 this.tasksQueueSize(workerNodeKey
) === 0 &&
860 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
861 (this.opts
.tasksQueueOptions
?.concurrency
as number)
866 public async execute (
869 transferList
?: TransferListItem
[]
870 ): Promise
<Response
> {
871 return await new Promise
<Response
>((resolve
, reject
) => {
873 reject(new Error('Cannot execute a task on destroyed pool'))
876 if (name
!= null && typeof name
!== 'string') {
877 reject(new TypeError('name argument must be a string'))
882 typeof name
=== 'string' &&
883 name
.trim().length
=== 0
885 reject(new TypeError('name argument must not be an empty string'))
888 if (transferList
!= null && !Array.isArray(transferList
)) {
889 reject(new TypeError('transferList argument must be an array'))
892 const timestamp
= performance
.now()
893 const workerNodeKey
= this.chooseWorkerNode()
894 const task
: Task
<Data
> = {
895 name
: name
?? DEFAULT_TASK_NAME
,
896 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
897 data
: data
?? ({} as Data
),
902 this.promiseResponseMap
.set(task
.taskId
as string, {
908 this.opts
.enableTasksQueue
=== false ||
909 (this.opts
.enableTasksQueue
=== true &&
910 this.shallExecuteTask(workerNodeKey
))
912 this.executeTask(workerNodeKey
, task
)
914 this.enqueueTask(workerNodeKey
, task
)
920 public async destroy (): Promise
<void> {
922 this.workerNodes
.map(async (_
, workerNodeKey
) => {
923 await this.destroyWorkerNode(workerNodeKey
)
926 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
930 protected async sendKillMessageToWorker (
931 workerNodeKey
: number
933 await new Promise
<void>((resolve
, reject
) => {
934 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
935 if (message
.kill
=== 'success') {
937 } else if (message
.kill
=== 'failure') {
941 message.workerId as number
942 } kill message handling failed`
947 this.sendToWorker(workerNodeKey
, { kill
: true })
952 * Terminates the worker node given its worker node key.
954 * @param workerNodeKey - The worker node key.
956 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
959 * Setup hook to execute code before worker nodes are created in the abstract constructor.
964 protected setupHook (): void {
965 /* Intentionally empty */
969 * Should return whether the worker is the main worker or not.
971 protected abstract isMain (): boolean
974 * Hook executed before the worker task execution.
977 * @param workerNodeKey - The worker node key.
978 * @param task - The task to execute.
980 protected beforeTaskExecutionHook (
981 workerNodeKey
: number,
984 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
985 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
986 ++workerUsage
.tasks
.executing
987 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
990 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
991 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
995 const taskFunctionWorkerUsage
= this.workerNodes
[
997 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
998 ++taskFunctionWorkerUsage
.tasks
.executing
999 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1004 * Hook executed after the worker task execution.
1005 * Can be overridden.
1007 * @param workerNodeKey - The worker node key.
1008 * @param message - The received message.
1010 protected afterTaskExecutionHook (
1011 workerNodeKey
: number,
1012 message
: MessageValue
<Response
>
1014 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1015 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1016 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1017 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1018 this.updateEluWorkerUsage(workerUsage
, message
)
1021 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1022 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1023 message
.taskPerformance
?.name
as string
1026 const taskFunctionWorkerUsage
= this.workerNodes
[
1028 ].getTaskFunctionWorkerUsage(
1029 message
.taskPerformance
?.name
as string
1031 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1032 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1033 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1038 * Whether the worker node shall update its task function worker usage or not.
1040 * @param workerNodeKey - The worker node key.
1041 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1043 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1044 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1046 workerInfo
!= null &&
1047 Array.isArray(workerInfo
.taskFunctionNames
) &&
1048 workerInfo
.taskFunctionNames
.length
> 2
1052 private updateTaskStatisticsWorkerUsage (
1053 workerUsage
: WorkerUsage
,
1054 message
: MessageValue
<Response
>
1056 const workerTaskStatistics
= workerUsage
.tasks
1058 workerTaskStatistics
.executing
!= null &&
1059 workerTaskStatistics
.executing
> 0
1061 --workerTaskStatistics
.executing
1063 if (message
.workerError
== null) {
1064 ++workerTaskStatistics
.executed
1066 ++workerTaskStatistics
.failed
1070 private updateRunTimeWorkerUsage (
1071 workerUsage
: WorkerUsage
,
1072 message
: MessageValue
<Response
>
1074 if (message
.workerError
!= null) {
1077 updateMeasurementStatistics(
1078 workerUsage
.runTime
,
1079 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1080 message
.taskPerformance
?.runTime
?? 0
1084 private updateWaitTimeWorkerUsage (
1085 workerUsage
: WorkerUsage
,
1088 const timestamp
= performance
.now()
1089 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1090 updateMeasurementStatistics(
1091 workerUsage
.waitTime
,
1092 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1097 private updateEluWorkerUsage (
1098 workerUsage
: WorkerUsage
,
1099 message
: MessageValue
<Response
>
1101 if (message
.workerError
!= null) {
1104 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1105 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1106 updateMeasurementStatistics(
1107 workerUsage
.elu
.active
,
1108 eluTaskStatisticsRequirements
,
1109 message
.taskPerformance
?.elu
?.active
?? 0
1111 updateMeasurementStatistics(
1112 workerUsage
.elu
.idle
,
1113 eluTaskStatisticsRequirements
,
1114 message
.taskPerformance
?.elu
?.idle
?? 0
1116 if (eluTaskStatisticsRequirements
.aggregate
) {
1117 if (message
.taskPerformance
?.elu
!= null) {
1118 if (workerUsage
.elu
.utilization
!= null) {
1119 workerUsage
.elu
.utilization
=
1120 (workerUsage
.elu
.utilization
+
1121 message
.taskPerformance
.elu
.utilization
) /
1124 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1131 * Chooses a worker node for the next task.
1133 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1135 * @returns The chosen worker node key
1137 private chooseWorkerNode (): number {
1138 if (this.shallCreateDynamicWorker()) {
1139 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1141 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1143 return workerNodeKey
1146 return this.workerChoiceStrategyContext
.execute()
1150 * Conditions for dynamic worker creation.
1152 * @returns Whether to create a dynamic worker or not.
1154 private shallCreateDynamicWorker (): boolean {
1155 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1159 * Sends a message to worker given its worker node key.
1161 * @param workerNodeKey - The worker node key.
1162 * @param message - The message.
1163 * @param transferList - The optional array of transferable objects.
1165 protected abstract sendToWorker (
1166 workerNodeKey
: number,
1167 message
: MessageValue
<Data
>,
1168 transferList
?: TransferListItem
[]
1172 * Creates a new worker.
1174 * @returns Newly created worker.
1176 protected abstract createWorker (): Worker
1179 * Creates a new, completely set up worker node.
1181 * @returns New, completely set up worker node key.
1183 protected createAndSetupWorkerNode (): number {
1184 const worker
= this.createWorker()
1186 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1187 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1188 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1189 worker
.on('error', error
=> {
1190 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1191 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1192 workerInfo
.ready
= false
1193 this.workerNodes
[workerNodeKey
].closeChannel()
1194 this.emitter
?.emit(PoolEvents
.error
, error
)
1196 this.opts
.restartWorkerOnError
=== true &&
1200 if (workerInfo
.dynamic
) {
1201 this.createAndSetupDynamicWorkerNode()
1203 this.createAndSetupWorkerNode()
1206 if (this.opts
.enableTasksQueue
=== true) {
1207 this.redistributeQueuedTasks(workerNodeKey
)
1210 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1211 worker
.once('exit', () => {
1212 this.removeWorkerNode(worker
)
1215 const workerNodeKey
= this.addWorkerNode(worker
)
1217 this.afterWorkerNodeSetup(workerNodeKey
)
1219 return workerNodeKey
1223 * Creates a new, completely set up dynamic worker node.
1225 * @returns New, completely set up dynamic worker node key.
1227 protected createAndSetupDynamicWorkerNode (): number {
1228 const workerNodeKey
= this.createAndSetupWorkerNode()
1229 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1230 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1233 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1234 // Kill message received from worker
1236 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1237 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1238 ((this.opts
.enableTasksQueue
=== false &&
1239 workerUsage
.tasks
.executing
=== 0) ||
1240 (this.opts
.enableTasksQueue
=== true &&
1241 workerUsage
.tasks
.executing
=== 0 &&
1242 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1244 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1245 this.emitter
?.emit(PoolEvents
.error
, error
)
1249 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1250 this.sendToWorker(workerNodeKey
, {
1252 workerId
: workerInfo
.id
as number
1254 if (this.taskFunctions
.size
> 0) {
1255 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1256 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1257 taskFunctionOperation
: 'add',
1259 taskFunction
: taskFunction
.toString()
1261 this.emitter
?.emit(PoolEvents
.error
, error
)
1265 workerInfo
.dynamic
= true
1267 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1268 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1270 workerInfo
.ready
= true
1272 this.checkAndEmitDynamicWorkerCreationEvents()
1273 return workerNodeKey
1277 * Registers a listener callback on the worker given its worker node key.
1279 * @param workerNodeKey - The worker node key.
1280 * @param listener - The message listener callback.
1282 protected abstract registerWorkerMessageListener
<
1283 Message
extends Data
| Response
1285 workerNodeKey
: number,
1286 listener
: (message
: MessageValue
<Message
>) => void
1290 * Method hooked up after a worker node has been newly created.
1291 * Can be overridden.
1293 * @param workerNodeKey - The newly created worker node key.
1295 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1296 // Listen to worker messages.
1297 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1298 // Send the startup message to worker.
1299 this.sendStartupMessageToWorker(workerNodeKey
)
1300 // Send the statistics message to worker.
1301 this.sendStatisticsMessageToWorker(workerNodeKey
)
1302 if (this.opts
.enableTasksQueue
=== true) {
1303 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1304 this.taskStealingOnEmptyQueue
.bind(this)
1305 this.workerNodes
[workerNodeKey
].onBackPressure
=
1306 this.tasksStealingOnBackPressure
.bind(this)
1311 * Sends the startup message to worker given its worker node key.
1313 * @param workerNodeKey - The worker node key.
1315 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1318 * Sends the statistics message to worker given its worker node key.
1320 * @param workerNodeKey - The worker node key.
1322 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1323 this.sendToWorker(workerNodeKey
, {
1326 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1328 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1334 private redistributeQueuedTasks (workerNodeKey
: number): void {
1335 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1336 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1337 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1338 return workerNode
.info
.ready
&&
1339 workerNode
.usage
.tasks
.queued
<
1340 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1346 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1347 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1348 this.executeTask(destinationWorkerNodeKey
, task
)
1350 this.enqueueTask(destinationWorkerNodeKey
, task
)
1355 private updateTaskStolenStatisticsWorkerUsage (
1356 workerNodeKey
: number,
1359 const workerNode
= this.workerNodes
[workerNodeKey
]
1360 if (workerNode
?.usage
!= null) {
1361 ++workerNode
.usage
.tasks
.stolen
1364 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1365 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1367 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1370 ++taskFunctionWorkerUsage
.tasks
.stolen
1374 private taskStealingOnEmptyQueue (workerId
: number): void {
1375 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1376 const workerNodes
= this.workerNodes
1379 (workerNodeA
, workerNodeB
) =>
1380 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1382 const sourceWorkerNode
= workerNodes
.find(
1384 workerNode
.info
.ready
&&
1385 workerNode
.info
.id
!== workerId
&&
1386 workerNode
.usage
.tasks
.queued
> 0
1388 if (sourceWorkerNode
!= null) {
1389 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1390 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1391 this.executeTask(destinationWorkerNodeKey
, task
)
1393 this.enqueueTask(destinationWorkerNodeKey
, task
)
1395 this.updateTaskStolenStatisticsWorkerUsage(
1396 destinationWorkerNodeKey
,
1402 private tasksStealingOnBackPressure (workerId
: number): void {
1403 const sizeOffset
= 1
1404 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1407 const sourceWorkerNode
=
1408 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1409 const workerNodes
= this.workerNodes
1412 (workerNodeA
, workerNodeB
) =>
1413 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1415 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1417 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1418 workerNode
.info
.ready
&&
1419 workerNode
.info
.id
!== workerId
&&
1420 workerNode
.usage
.tasks
.queued
<
1421 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1423 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1424 if (this.shallExecuteTask(workerNodeKey
)) {
1425 this.executeTask(workerNodeKey
, task
)
1427 this.enqueueTask(workerNodeKey
, task
)
1429 this.updateTaskStolenStatisticsWorkerUsage(
1438 * This method is the listener registered for each worker message.
1440 * @returns The listener function to execute when a message is received from a worker.
1442 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1444 this.checkMessageWorkerId(message
)
1445 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1446 // Worker ready response received from worker
1447 this.handleWorkerReadyResponse(message
)
1448 } else if (message
.taskId
!= null) {
1449 // Task execution response received from worker
1450 this.handleTaskExecutionResponse(message
)
1451 } else if (message
.taskFunctionNames
!= null) {
1452 // Task function names message received from worker
1454 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1455 ).taskFunctionNames
= message
.taskFunctionNames
1460 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1461 if (message
.ready
=== false) {
1463 `Worker ${message.workerId as number} failed to initialize`
1466 const workerInfo
= this.getWorkerInfo(
1467 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1469 workerInfo
.ready
= message
.ready
as boolean
1470 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1471 if (this.emitter
!= null && this.ready
) {
1472 this.emitter
.emit(PoolEvents
.ready
, this.info
)
1476 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1477 const { taskId
, workerError
, data
} = message
1478 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1479 if (promiseResponse
!= null) {
1480 if (workerError
!= null) {
1481 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1482 promiseResponse
.reject(workerError
.message
)
1484 promiseResponse
.resolve(data
as Response
)
1486 const workerNodeKey
= promiseResponse
.workerNodeKey
1487 this.afterTaskExecutionHook(workerNodeKey
, message
)
1488 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1489 this.promiseResponseMap
.delete(taskId
as string)
1491 this.opts
.enableTasksQueue
=== true &&
1492 this.tasksQueueSize(workerNodeKey
) > 0 &&
1493 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1494 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1498 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1504 private checkAndEmitTaskExecutionEvents (): void {
1506 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1510 private checkAndEmitTaskQueuingEvents (): void {
1511 if (this.hasBackPressure()) {
1512 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1516 private checkAndEmitDynamicWorkerCreationEvents (): void {
1517 if (this.type === PoolTypes
.dynamic
) {
1519 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1525 * Gets the worker information given its worker node key.
1527 * @param workerNodeKey - The worker node key.
1528 * @returns The worker information.
1530 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1531 return this.workerNodes
[workerNodeKey
].info
1535 * Adds the given worker in the pool worker nodes.
1537 * @param worker - The worker.
1538 * @returns The added worker node key.
1539 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1541 private addWorkerNode (worker
: Worker
): number {
1542 const workerNode
= new WorkerNode
<Worker
, Data
>(
1544 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1546 // Flag the worker node as ready at pool startup.
1547 if (this.starting
) {
1548 workerNode
.info
.ready
= true
1550 this.workerNodes
.push(workerNode
)
1551 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1552 if (workerNodeKey
=== -1) {
1553 throw new Error('Worker added not found in worker nodes')
1555 return workerNodeKey
1559 * Removes the given worker from the pool worker nodes.
1561 * @param worker - The worker.
1563 private removeWorkerNode (worker
: Worker
): void {
1564 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1565 if (workerNodeKey
!== -1) {
1566 this.workerNodes
.splice(workerNodeKey
, 1)
1567 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1572 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1574 this.opts
.enableTasksQueue
=== true &&
1575 this.workerNodes
[workerNodeKey
].hasBackPressure()
1579 private hasBackPressure (): boolean {
1581 this.opts
.enableTasksQueue
=== true &&
1582 this.workerNodes
.findIndex(
1583 workerNode
=> !workerNode
.hasBackPressure()
1589 * Executes the given task on the worker given its worker node key.
1591 * @param workerNodeKey - The worker node key.
1592 * @param task - The task to execute.
1594 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1595 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1596 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1597 this.checkAndEmitTaskExecutionEvents()
1600 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1601 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1602 this.checkAndEmitTaskQueuingEvents()
1603 return tasksQueueSize
1606 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1607 return this.workerNodes
[workerNodeKey
].dequeueTask()
1610 private tasksQueueSize (workerNodeKey
: number): number {
1611 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1614 protected flushTasksQueue (workerNodeKey
: number): void {
1615 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1618 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1621 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1624 private flushTasksQueues (): void {
1625 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1626 this.flushTasksQueue(workerNodeKey
)