1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { existsSync
} from
'node:fs'
4 import { type TransferListItem
} from
'node:worker_threads'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
21 updateMeasurementStatistics
23 import { KillBehaviors
} from
'../worker/worker-options'
24 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
43 type MeasurementStatisticsRequirements
,
45 WorkerChoiceStrategies
,
46 type WorkerChoiceStrategy
,
47 type WorkerChoiceStrategyOptions
48 } from
'./selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
50 import { version
} from
'./version'
51 import { WorkerNode
} from
'./worker-node'
54 * Base class that implements some shared logic for all poolifier pools.
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
60 export abstract class AbstractPool
<
61 Worker
extends IWorker
,
64 > implements IPool
<Worker
, Data
, Response
> {
66 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
69 public readonly emitter
?: PoolEmitter
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
79 new Map
<string, PromiseResponseWrapper
<Response
>>()
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
91 * Dynamic pool maximum size property placeholder.
93 protected readonly max
?: number
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
100 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
103 * Whether the pool is starting or not.
105 private readonly starting
: boolean
107 * Whether the pool is started or not.
109 private started
: boolean
111 * The start timestamp of the pool.
113 private readonly startTimestamp
116 * Constructs a new poolifier pool.
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
123 protected readonly numberOfWorkers
: number,
124 protected readonly filePath
: string,
125 protected readonly opts
: PoolOptions
<Worker
>
127 if (!this.isMain()) {
129 'Cannot start a pool from a worker with the same type as the pool'
132 this.checkNumberOfWorkers(this.numberOfWorkers
)
133 this.checkFilePath(this.filePath
)
134 this.checkPoolOptions(this.opts
)
136 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
137 this.executeTask
= this.executeTask
.bind(this)
138 this.enqueueTask
= this.enqueueTask
.bind(this)
140 if (this.opts
.enableEvents
=== true) {
141 this.emitter
= new PoolEmitter()
143 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
149 this.opts
.workerChoiceStrategy
,
150 this.opts
.workerChoiceStrategyOptions
155 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
159 this.starting
= false
162 this.startTimestamp
= performance
.now()
165 private checkFilePath (filePath
: string): void {
168 typeof filePath
!== 'string' ||
169 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
171 throw new Error('Please specify a file with a worker implementation')
173 if (!existsSync(filePath
)) {
174 throw new Error(`Cannot find the worker file '${filePath}'`)
178 private checkNumberOfWorkers (numberOfWorkers
: number): void {
179 if (numberOfWorkers
== null) {
181 'Cannot instantiate a pool without specifying the number of workers'
183 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
185 'Cannot instantiate a pool with a non safe integer number of workers'
187 } else if (numberOfWorkers
< 0) {
188 throw new RangeError(
189 'Cannot instantiate a pool with a negative number of workers'
191 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
192 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
196 protected checkDynamicPoolSize (min
: number, max
: number): void {
197 if (this.type === PoolTypes
.dynamic
) {
200 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 } else if (!Number.isSafeInteger(max
)) {
204 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 } else if (min
> max
) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 } else if (max
=== 0) {
211 throw new RangeError(
212 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
214 } else if (min
=== max
) {
215 throw new RangeError(
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
222 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
223 if (isPlainObject(opts
)) {
224 this.opts
.workerChoiceStrategy
=
225 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
226 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
227 this.opts
.workerChoiceStrategyOptions
= {
228 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
229 ...opts
.workerChoiceStrategyOptions
231 this.checkValidWorkerChoiceStrategyOptions(
232 this.opts
.workerChoiceStrategyOptions
234 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
235 this.opts
.enableEvents
= opts
.enableEvents
?? true
236 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
237 if (this.opts
.enableTasksQueue
) {
238 this.checkValidTasksQueueOptions(
239 opts
.tasksQueueOptions
as TasksQueueOptions
241 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
242 opts
.tasksQueueOptions
as TasksQueueOptions
246 throw new TypeError('Invalid pool options: must be a plain object')
250 private checkValidWorkerChoiceStrategy (
251 workerChoiceStrategy
: WorkerChoiceStrategy
253 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
255 `Invalid worker choice strategy '${workerChoiceStrategy}'`
260 private checkValidWorkerChoiceStrategyOptions (
261 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
263 if (!isPlainObject(workerChoiceStrategyOptions
)) {
265 'Invalid worker choice strategy options: must be a plain object'
269 workerChoiceStrategyOptions
.retries
!= null &&
270 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
273 'Invalid worker choice strategy options: retries must be an integer'
277 workerChoiceStrategyOptions
.retries
!= null &&
278 workerChoiceStrategyOptions
.retries
< 0
280 throw new RangeError(
281 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
285 workerChoiceStrategyOptions
.weights
!= null &&
286 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
289 'Invalid worker choice strategy options: must have a weight for each worker node'
293 workerChoiceStrategyOptions
.measurement
!= null &&
294 !Object.values(Measurements
).includes(
295 workerChoiceStrategyOptions
.measurement
299 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
304 private checkValidTasksQueueOptions (
305 tasksQueueOptions
: TasksQueueOptions
307 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
308 throw new TypeError('Invalid tasks queue options: must be a plain object')
311 tasksQueueOptions
?.concurrency
!= null &&
312 !Number.isSafeInteger(tasksQueueOptions
?.concurrency
)
315 'Invalid worker node tasks concurrency: must be an integer'
319 tasksQueueOptions
?.concurrency
!= null &&
320 tasksQueueOptions
?.concurrency
<= 0
322 throw new RangeError(
323 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
326 if (tasksQueueOptions
?.queueMaxSize
!= null) {
328 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
332 tasksQueueOptions
?.size
!= null &&
333 !Number.isSafeInteger(tasksQueueOptions
?.size
)
336 'Invalid worker node tasks queue size: must be an integer'
339 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
?.size
<= 0) {
340 throw new RangeError(
341 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
346 private startPool (): void {
348 this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
352 ) < this.numberOfWorkers
354 this.createAndSetupWorkerNode()
359 public get
info (): PoolInfo
{
365 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
366 minSize
: this.minSize
,
367 maxSize
: this.maxSize
,
368 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
369 .runTime
.aggregate
&&
370 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
372 workerNodes
: this.workerNodes
.length
,
373 idleWorkerNodes
: this.workerNodes
.reduce(
374 (accumulator
, workerNode
) =>
375 workerNode
.usage
.tasks
.executing
=== 0
380 busyWorkerNodes
: this.workerNodes
.reduce(
381 (accumulator
, workerNode
) =>
382 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
385 executedTasks
: this.workerNodes
.reduce(
386 (accumulator
, workerNode
) =>
387 accumulator
+ workerNode
.usage
.tasks
.executed
,
390 executingTasks
: this.workerNodes
.reduce(
391 (accumulator
, workerNode
) =>
392 accumulator
+ workerNode
.usage
.tasks
.executing
,
395 ...(this.opts
.enableTasksQueue
=== true && {
396 queuedTasks
: this.workerNodes
.reduce(
397 (accumulator
, workerNode
) =>
398 accumulator
+ workerNode
.usage
.tasks
.queued
,
402 ...(this.opts
.enableTasksQueue
=== true && {
403 maxQueuedTasks
: this.workerNodes
.reduce(
404 (accumulator
, workerNode
) =>
405 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
409 ...(this.opts
.enableTasksQueue
=== true && {
410 backPressure
: this.hasBackPressure()
412 ...(this.opts
.enableTasksQueue
=== true && {
413 stolenTasks
: this.workerNodes
.reduce(
414 (accumulator
, workerNode
) =>
415 accumulator
+ workerNode
.usage
.tasks
.stolen
,
419 failedTasks
: this.workerNodes
.reduce(
420 (accumulator
, workerNode
) =>
421 accumulator
+ workerNode
.usage
.tasks
.failed
,
424 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
425 .runTime
.aggregate
&& {
429 ...this.workerNodes
.map(
430 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
436 ...this.workerNodes
.map(
437 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
441 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
442 .runTime
.average
&& {
445 this.workerNodes
.reduce
<number[]>(
446 (accumulator
, workerNode
) =>
447 accumulator
.concat(workerNode
.usage
.runTime
.history
),
453 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
457 this.workerNodes
.reduce
<number[]>(
458 (accumulator
, workerNode
) =>
459 accumulator
.concat(workerNode
.usage
.runTime
.history
),
467 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
468 .waitTime
.aggregate
&& {
472 ...this.workerNodes
.map(
473 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
479 ...this.workerNodes
.map(
480 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
484 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
485 .waitTime
.average
&& {
488 this.workerNodes
.reduce
<number[]>(
489 (accumulator
, workerNode
) =>
490 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
496 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
497 .waitTime
.median
&& {
500 this.workerNodes
.reduce
<number[]>(
501 (accumulator
, workerNode
) =>
502 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
514 * The pool readiness boolean status.
516 private get
ready (): boolean {
518 this.workerNodes
.reduce(
519 (accumulator
, workerNode
) =>
520 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
529 * The approximate pool utilization.
531 * @returns The pool utilization.
533 private get
utilization (): number {
534 const poolTimeCapacity
=
535 (performance
.now() - this.startTimestamp
) * this.maxSize
536 const totalTasksRunTime
= this.workerNodes
.reduce(
537 (accumulator
, workerNode
) =>
538 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
541 const totalTasksWaitTime
= this.workerNodes
.reduce(
542 (accumulator
, workerNode
) =>
543 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
546 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
552 * If it is `'dynamic'`, it provides the `max` property.
554 protected abstract get
type (): PoolType
559 protected abstract get
worker (): WorkerType
562 * The pool minimum size.
564 protected get
minSize (): number {
565 return this.numberOfWorkers
569 * The pool maximum size.
571 protected get
maxSize (): number {
572 return this.max
?? this.numberOfWorkers
576 * Checks if the worker id sent in the received message from a worker is valid.
578 * @param message - The received message.
579 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
581 private checkMessageWorkerId (message
: MessageValue
<Response
>): void {
582 if (message
.workerId
== null) {
583 throw new Error('Worker message received without worker id')
585 message
.workerId
!= null &&
586 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
589 `Worker message received from unknown worker '${message.workerId}'`
595 * Gets the given worker its worker node key.
597 * @param worker - The worker.
598 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
600 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
601 return this.workerNodes
.findIndex(
602 workerNode
=> workerNode
.worker
=== worker
607 * Gets the worker node key given its worker id.
609 * @param workerId - The worker id.
610 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
612 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
613 return this.workerNodes
.findIndex(
614 workerNode
=> workerNode
.info
.id
=== workerId
619 public setWorkerChoiceStrategy (
620 workerChoiceStrategy
: WorkerChoiceStrategy
,
621 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
623 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
624 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
625 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
626 this.opts
.workerChoiceStrategy
628 if (workerChoiceStrategyOptions
!= null) {
629 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
631 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
632 workerNode
.resetUsage()
633 this.sendStatisticsMessageToWorker(workerNodeKey
)
638 public setWorkerChoiceStrategyOptions (
639 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
641 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
642 this.opts
.workerChoiceStrategyOptions
= {
643 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
644 ...workerChoiceStrategyOptions
646 this.workerChoiceStrategyContext
.setOptions(
647 this.opts
.workerChoiceStrategyOptions
652 public enableTasksQueue (
654 tasksQueueOptions
?: TasksQueueOptions
656 if (this.opts
.enableTasksQueue
=== true && !enable
) {
657 this.flushTasksQueues()
659 this.opts
.enableTasksQueue
= enable
660 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
664 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
665 if (this.opts
.enableTasksQueue
=== true) {
666 this.checkValidTasksQueueOptions(tasksQueueOptions
)
667 this.opts
.tasksQueueOptions
=
668 this.buildTasksQueueOptions(tasksQueueOptions
)
669 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
670 } else if (this.opts
.tasksQueueOptions
!= null) {
671 delete this.opts
.tasksQueueOptions
675 private setTasksQueueSize (size
: number): void {
676 for (const workerNode
of this.workerNodes
) {
677 workerNode
.tasksQueueBackPressureSize
= size
681 private buildTasksQueueOptions (
682 tasksQueueOptions
: TasksQueueOptions
683 ): TasksQueueOptions
{
686 size
: Math.pow(this.maxSize
, 2),
694 * Whether the pool is full or not.
696 * The pool filling boolean status.
698 protected get
full (): boolean {
699 return this.workerNodes
.length
>= this.maxSize
703 * Whether the pool is busy or not.
705 * The pool busyness boolean status.
707 protected abstract get
busy (): boolean
710 * Whether worker nodes are executing concurrently their tasks quota or not.
712 * @returns Worker nodes busyness boolean status.
714 protected internalBusy (): boolean {
715 if (this.opts
.enableTasksQueue
=== true) {
717 this.workerNodes
.findIndex(
719 workerNode
.info
.ready
&&
720 workerNode
.usage
.tasks
.executing
<
721 (this.opts
.tasksQueueOptions
?.concurrency
as number)
726 this.workerNodes
.findIndex(
728 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
733 private async sendTaskFunctionOperationToWorker (
734 workerNodeKey
: number,
735 message
: MessageValue
<Data
>
736 ): Promise
<boolean> {
737 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
738 return await new Promise
<boolean>((resolve
, reject
) => {
739 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
741 message
.workerId
=== workerId
&&
742 message
.taskFunctionOperationStatus
=== true
746 message
.workerId
=== workerId
&&
747 message
.taskFunctionOperationStatus
=== false
751 `Task function operation ${
752 message.taskFunctionOperation as string
753 } failed on worker ${message.workerId}`
758 this.sendToWorker(workerNodeKey
, message
)
762 private async sendTaskFunctionOperationToWorkers (
763 message
: Omit
<MessageValue
<Data
>, 'workerId'>
764 ): Promise
<boolean> {
765 return await new Promise
<boolean>((resolve
, reject
) => {
766 const responsesReceived
= new Array<MessageValue
<Data
| Response
>>()
767 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
768 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
769 if (message
.taskFunctionOperationStatus
!= null) {
770 responsesReceived
.push(message
)
772 responsesReceived
.length
=== this.workerNodes
.length
&&
773 responsesReceived
.every(
774 message
=> message
.taskFunctionOperationStatus
=== true
779 responsesReceived
.length
=== this.workerNodes
.length
&&
780 responsesReceived
.some(
781 message
=> message
.taskFunctionOperationStatus
=== false
786 `Task function operation ${
787 message.taskFunctionOperation as string
788 } failed on worker ${message.workerId as number}`
794 this.sendToWorker(workerNodeKey
, message
)
800 public hasTaskFunction (name
: string): boolean {
801 for (const workerNode
of this.workerNodes
) {
803 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
804 workerNode
.info
.taskFunctionNames
.includes(name
)
813 public async addTaskFunction (
815 taskFunction
: TaskFunction
<Data
, Response
>
816 ): Promise
<boolean> {
817 this.taskFunctions
.set(name
, taskFunction
)
818 return await this.sendTaskFunctionOperationToWorkers({
819 taskFunctionOperation
: 'add',
820 taskFunctionName
: name
,
821 taskFunction
: taskFunction
.toString()
826 public async removeTaskFunction (name
: string): Promise
<boolean> {
827 this.taskFunctions
.delete(name
)
828 return await this.sendTaskFunctionOperationToWorkers({
829 taskFunctionOperation
: 'remove',
830 taskFunctionName
: name
835 public listTaskFunctionNames (): string[] {
836 for (const workerNode
of this.workerNodes
) {
838 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
839 workerNode
.info
.taskFunctionNames
.length
> 0
841 return workerNode
.info
.taskFunctionNames
848 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
849 return await this.sendTaskFunctionOperationToWorkers({
850 taskFunctionOperation
: 'default',
851 taskFunctionName
: name
855 private shallExecuteTask (workerNodeKey
: number): boolean {
857 this.tasksQueueSize(workerNodeKey
) === 0 &&
858 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
859 (this.opts
.tasksQueueOptions
?.concurrency
as number)
864 public async execute (
867 transferList
?: TransferListItem
[]
868 ): Promise
<Response
> {
869 return await new Promise
<Response
>((resolve
, reject
) => {
871 reject(new Error('Cannot execute a task on destroyed pool'))
874 if (name
!= null && typeof name
!== 'string') {
875 reject(new TypeError('name argument must be a string'))
880 typeof name
=== 'string' &&
881 name
.trim().length
=== 0
883 reject(new TypeError('name argument must not be an empty string'))
886 if (transferList
!= null && !Array.isArray(transferList
)) {
887 reject(new TypeError('transferList argument must be an array'))
890 const timestamp
= performance
.now()
891 const workerNodeKey
= this.chooseWorkerNode()
892 const task
: Task
<Data
> = {
893 name
: name
?? DEFAULT_TASK_NAME
,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data
: data
?? ({} as Data
),
900 this.promiseResponseMap
.set(task
.taskId
as string, {
906 this.opts
.enableTasksQueue
=== false ||
907 (this.opts
.enableTasksQueue
=== true &&
908 this.shallExecuteTask(workerNodeKey
))
910 this.executeTask(workerNodeKey
, task
)
912 this.enqueueTask(workerNodeKey
, task
)
918 public async destroy (): Promise
<void> {
920 this.workerNodes
.map(async (_
, workerNodeKey
) => {
921 await this.destroyWorkerNode(workerNodeKey
)
924 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
928 protected async sendKillMessageToWorker (
929 workerNodeKey
: number
931 await new Promise
<void>((resolve
, reject
) => {
932 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
933 if (message
.kill
=== 'success') {
935 } else if (message
.kill
=== 'failure') {
939 message.workerId as number
940 } kill message handling failed`
945 this.sendToWorker(workerNodeKey
, { kill
: true })
950 * Terminates the worker node given its worker node key.
952 * @param workerNodeKey - The worker node key.
954 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
957 * Setup hook to execute code before worker nodes are created in the abstract constructor.
962 protected setupHook (): void {
963 /* Intentionally empty */
967 * Should return whether the worker is the main worker or not.
969 protected abstract isMain (): boolean
972 * Hook executed before the worker task execution.
975 * @param workerNodeKey - The worker node key.
976 * @param task - The task to execute.
978 protected beforeTaskExecutionHook (
979 workerNodeKey
: number,
982 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
983 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
984 ++workerUsage
.tasks
.executing
985 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
988 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
989 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
993 const taskFunctionWorkerUsage
= this.workerNodes
[
995 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
996 ++taskFunctionWorkerUsage
.tasks
.executing
997 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1002 * Hook executed after the worker task execution.
1003 * Can be overridden.
1005 * @param workerNodeKey - The worker node key.
1006 * @param message - The received message.
1008 protected afterTaskExecutionHook (
1009 workerNodeKey
: number,
1010 message
: MessageValue
<Response
>
1012 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1013 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1014 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1015 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1016 this.updateEluWorkerUsage(workerUsage
, message
)
1019 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1020 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1021 message
.taskPerformance
?.name
as string
1024 const taskFunctionWorkerUsage
= this.workerNodes
[
1026 ].getTaskFunctionWorkerUsage(
1027 message
.taskPerformance
?.name
as string
1029 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1030 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1031 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1036 * Whether the worker node shall update its task function worker usage or not.
1038 * @param workerNodeKey - The worker node key.
1039 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1041 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1042 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1044 workerInfo
!= null &&
1045 Array.isArray(workerInfo
.taskFunctionNames
) &&
1046 workerInfo
.taskFunctionNames
.length
> 2
1050 private updateTaskStatisticsWorkerUsage (
1051 workerUsage
: WorkerUsage
,
1052 message
: MessageValue
<Response
>
1054 const workerTaskStatistics
= workerUsage
.tasks
1056 workerTaskStatistics
.executing
!= null &&
1057 workerTaskStatistics
.executing
> 0
1059 --workerTaskStatistics
.executing
1061 if (message
.workerError
== null) {
1062 ++workerTaskStatistics
.executed
1064 ++workerTaskStatistics
.failed
1068 private updateRunTimeWorkerUsage (
1069 workerUsage
: WorkerUsage
,
1070 message
: MessageValue
<Response
>
1072 if (message
.workerError
!= null) {
1075 updateMeasurementStatistics(
1076 workerUsage
.runTime
,
1077 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1078 message
.taskPerformance
?.runTime
?? 0
1082 private updateWaitTimeWorkerUsage (
1083 workerUsage
: WorkerUsage
,
1086 const timestamp
= performance
.now()
1087 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1088 updateMeasurementStatistics(
1089 workerUsage
.waitTime
,
1090 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1095 private updateEluWorkerUsage (
1096 workerUsage
: WorkerUsage
,
1097 message
: MessageValue
<Response
>
1099 if (message
.workerError
!= null) {
1102 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1103 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1104 updateMeasurementStatistics(
1105 workerUsage
.elu
.active
,
1106 eluTaskStatisticsRequirements
,
1107 message
.taskPerformance
?.elu
?.active
?? 0
1109 updateMeasurementStatistics(
1110 workerUsage
.elu
.idle
,
1111 eluTaskStatisticsRequirements
,
1112 message
.taskPerformance
?.elu
?.idle
?? 0
1114 if (eluTaskStatisticsRequirements
.aggregate
) {
1115 if (message
.taskPerformance
?.elu
!= null) {
1116 if (workerUsage
.elu
.utilization
!= null) {
1117 workerUsage
.elu
.utilization
=
1118 (workerUsage
.elu
.utilization
+
1119 message
.taskPerformance
.elu
.utilization
) /
1122 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1129 * Chooses a worker node for the next task.
1131 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1133 * @returns The chosen worker node key
1135 private chooseWorkerNode (): number {
1136 if (this.shallCreateDynamicWorker()) {
1137 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1139 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1141 return workerNodeKey
1144 return this.workerChoiceStrategyContext
.execute()
1148 * Conditions for dynamic worker creation.
1150 * @returns Whether to create a dynamic worker or not.
1152 private shallCreateDynamicWorker (): boolean {
1153 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1157 * Sends a message to worker given its worker node key.
1159 * @param workerNodeKey - The worker node key.
1160 * @param message - The message.
1161 * @param transferList - The optional array of transferable objects.
1163 protected abstract sendToWorker (
1164 workerNodeKey
: number,
1165 message
: MessageValue
<Data
>,
1166 transferList
?: TransferListItem
[]
1170 * Creates a new worker.
1172 * @returns Newly created worker.
1174 protected abstract createWorker (): Worker
1177 * Creates a new, completely set up worker node.
1179 * @returns New, completely set up worker node key.
1181 protected createAndSetupWorkerNode (): number {
1182 const worker
= this.createWorker()
1184 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1185 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1186 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1187 worker
.on('error', error
=> {
1188 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1189 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1190 workerInfo
.ready
= false
1191 this.workerNodes
[workerNodeKey
].closeChannel()
1192 this.emitter
?.emit(PoolEvents
.error
, error
)
1194 this.opts
.restartWorkerOnError
=== true &&
1198 if (workerInfo
.dynamic
) {
1199 this.createAndSetupDynamicWorkerNode()
1201 this.createAndSetupWorkerNode()
1204 if (this.opts
.enableTasksQueue
=== true) {
1205 this.redistributeQueuedTasks(workerNodeKey
)
1208 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1209 worker
.once('exit', () => {
1210 this.removeWorkerNode(worker
)
1213 const workerNodeKey
= this.addWorkerNode(worker
)
1215 this.afterWorkerNodeSetup(workerNodeKey
)
1217 return workerNodeKey
1221 * Creates a new, completely set up dynamic worker node.
1223 * @returns New, completely set up dynamic worker node key.
1225 protected createAndSetupDynamicWorkerNode (): number {
1226 const workerNodeKey
= this.createAndSetupWorkerNode()
1227 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1228 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1231 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1232 // Kill message received from worker
1234 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1235 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1236 ((this.opts
.enableTasksQueue
=== false &&
1237 workerUsage
.tasks
.executing
=== 0) ||
1238 (this.opts
.enableTasksQueue
=== true &&
1239 workerUsage
.tasks
.executing
=== 0 &&
1240 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1242 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1243 this.emitter
?.emit(PoolEvents
.error
, error
)
1247 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1248 this.sendToWorker(workerNodeKey
, {
1251 if (this.taskFunctions
.size
> 0) {
1252 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1253 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1254 taskFunctionOperation
: 'add',
1256 taskFunction
: taskFunction
.toString()
1258 this.emitter
?.emit(PoolEvents
.error
, error
)
1262 workerInfo
.dynamic
= true
1264 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1265 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1267 workerInfo
.ready
= true
1269 this.checkAndEmitDynamicWorkerCreationEvents()
1270 return workerNodeKey
1274 * Registers a listener callback on the worker given its worker node key.
1276 * @param workerNodeKey - The worker node key.
1277 * @param listener - The message listener callback.
1279 protected abstract registerWorkerMessageListener
<
1280 Message
extends Data
| Response
1282 workerNodeKey
: number,
1283 listener
: (message
: MessageValue
<Message
>) => void
1287 * Method hooked up after a worker node has been newly created.
1288 * Can be overridden.
1290 * @param workerNodeKey - The newly created worker node key.
1292 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1293 // Listen to worker messages.
1294 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1295 // Send the startup message to worker.
1296 this.sendStartupMessageToWorker(workerNodeKey
)
1297 // Send the statistics message to worker.
1298 this.sendStatisticsMessageToWorker(workerNodeKey
)
1299 if (this.opts
.enableTasksQueue
=== true) {
1300 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1301 this.taskStealingOnEmptyQueue
.bind(this)
1302 this.workerNodes
[workerNodeKey
].onBackPressure
=
1303 this.tasksStealingOnBackPressure
.bind(this)
1308 * Sends the startup message to worker given its worker node key.
1310 * @param workerNodeKey - The worker node key.
1312 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1315 * Sends the statistics message to worker given its worker node key.
1317 * @param workerNodeKey - The worker node key.
1319 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1320 this.sendToWorker(workerNodeKey
, {
1323 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1325 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1331 private redistributeQueuedTasks (workerNodeKey
: number): void {
1332 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1333 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1334 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1335 return workerNode
.info
.ready
&&
1336 workerNode
.usage
.tasks
.queued
<
1337 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1343 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1344 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1345 this.executeTask(destinationWorkerNodeKey
, task
)
1347 this.enqueueTask(destinationWorkerNodeKey
, task
)
1352 private updateTaskStolenStatisticsWorkerUsage (
1353 workerNodeKey
: number,
1356 const workerNode
= this.workerNodes
[workerNodeKey
]
1357 if (workerNode
?.usage
!= null) {
1358 ++workerNode
.usage
.tasks
.stolen
1361 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1362 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1364 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1367 ++taskFunctionWorkerUsage
.tasks
.stolen
1371 private taskStealingOnEmptyQueue (workerId
: number): void {
1372 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1373 const workerNodes
= this.workerNodes
1376 (workerNodeA
, workerNodeB
) =>
1377 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1379 const sourceWorkerNode
= workerNodes
.find(
1381 workerNode
.info
.ready
&&
1382 workerNode
.info
.id
!== workerId
&&
1383 workerNode
.usage
.tasks
.queued
> 0
1385 if (sourceWorkerNode
!= null) {
1386 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1387 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1388 this.executeTask(destinationWorkerNodeKey
, task
)
1390 this.enqueueTask(destinationWorkerNodeKey
, task
)
1392 this.updateTaskStolenStatisticsWorkerUsage(
1393 destinationWorkerNodeKey
,
1399 private tasksStealingOnBackPressure (workerId
: number): void {
1400 const sizeOffset
= 1
1401 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1404 const sourceWorkerNode
=
1405 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1406 const workerNodes
= this.workerNodes
1409 (workerNodeA
, workerNodeB
) =>
1410 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1412 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1414 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1415 workerNode
.info
.ready
&&
1416 workerNode
.info
.id
!== workerId
&&
1417 workerNode
.usage
.tasks
.queued
<
1418 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1420 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1421 if (this.shallExecuteTask(workerNodeKey
)) {
1422 this.executeTask(workerNodeKey
, task
)
1424 this.enqueueTask(workerNodeKey
, task
)
1426 this.updateTaskStolenStatisticsWorkerUsage(
1435 * This method is the listener registered for each worker message.
1437 * @returns The listener function to execute when a message is received from a worker.
1439 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1441 this.checkMessageWorkerId(message
)
1442 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1443 // Worker ready response received from worker
1444 this.handleWorkerReadyResponse(message
)
1445 } else if (message
.taskId
!= null) {
1446 // Task execution response received from worker
1447 this.handleTaskExecutionResponse(message
)
1448 } else if (message
.taskFunctionNames
!= null) {
1449 // Task function names message received from worker
1451 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1452 ).taskFunctionNames
= message
.taskFunctionNames
1457 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1458 if (message
.ready
=== false) {
1460 `Worker ${message.workerId as number} failed to initialize`
1463 const workerInfo
= this.getWorkerInfo(
1464 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1466 workerInfo
.ready
= message
.ready
as boolean
1467 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1468 if (this.emitter
!= null && this.ready
) {
1469 this.emitter
.emit(PoolEvents
.ready
, this.info
)
1473 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1474 const { taskId
, workerError
, data
} = message
1475 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1476 if (promiseResponse
!= null) {
1477 if (workerError
!= null) {
1478 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1479 promiseResponse
.reject(workerError
.message
)
1481 promiseResponse
.resolve(data
as Response
)
1483 const workerNodeKey
= promiseResponse
.workerNodeKey
1484 this.afterTaskExecutionHook(workerNodeKey
, message
)
1485 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1486 this.promiseResponseMap
.delete(taskId
as string)
1488 this.opts
.enableTasksQueue
=== true &&
1489 this.tasksQueueSize(workerNodeKey
) > 0 &&
1490 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1491 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1495 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1501 private checkAndEmitTaskExecutionEvents (): void {
1503 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1507 private checkAndEmitTaskQueuingEvents (): void {
1508 if (this.hasBackPressure()) {
1509 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1513 private checkAndEmitDynamicWorkerCreationEvents (): void {
1514 if (this.type === PoolTypes
.dynamic
) {
1516 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1522 * Gets the worker information given its worker node key.
1524 * @param workerNodeKey - The worker node key.
1525 * @returns The worker information.
1527 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1528 return this.workerNodes
[workerNodeKey
].info
1532 * Adds the given worker in the pool worker nodes.
1534 * @param worker - The worker.
1535 * @returns The added worker node key.
1536 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1538 private addWorkerNode (worker
: Worker
): number {
1539 const workerNode
= new WorkerNode
<Worker
, Data
>(
1541 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1543 // Flag the worker node as ready at pool startup.
1544 if (this.starting
) {
1545 workerNode
.info
.ready
= true
1547 this.workerNodes
.push(workerNode
)
1548 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1549 if (workerNodeKey
=== -1) {
1550 throw new Error('Worker added not found in worker nodes')
1552 return workerNodeKey
1556 * Removes the given worker from the pool worker nodes.
1558 * @param worker - The worker.
1560 private removeWorkerNode (worker
: Worker
): void {
1561 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1562 if (workerNodeKey
!== -1) {
1563 this.workerNodes
.splice(workerNodeKey
, 1)
1564 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1569 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1571 this.opts
.enableTasksQueue
=== true &&
1572 this.workerNodes
[workerNodeKey
].hasBackPressure()
1576 private hasBackPressure (): boolean {
1578 this.opts
.enableTasksQueue
=== true &&
1579 this.workerNodes
.findIndex(
1580 workerNode
=> !workerNode
.hasBackPressure()
1586 * Executes the given task on the worker given its worker node key.
1588 * @param workerNodeKey - The worker node key.
1589 * @param task - The task to execute.
1591 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1592 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1593 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1594 this.checkAndEmitTaskExecutionEvents()
1597 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1598 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1599 this.checkAndEmitTaskQueuingEvents()
1600 return tasksQueueSize
1603 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1604 return this.workerNodes
[workerNodeKey
].dequeueTask()
1607 private tasksQueueSize (workerNodeKey
: number): number {
1608 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1611 protected flushTasksQueue (workerNodeKey
: number): void {
1612 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1615 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1618 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1621 private flushTasksQueues (): void {
1622 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1623 this.flushTasksQueue(workerNodeKey
)