1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
24 import { KillBehaviors
} from
'../worker/worker-options'
25 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
40 WorkerNodeEventDetail
,
45 type MeasurementStatisticsRequirements
,
47 WorkerChoiceStrategies
,
48 type WorkerChoiceStrategy
,
49 type WorkerChoiceStrategyOptions
50 } from
'./selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
52 import { version
} from
'./version'
53 import { WorkerNode
} from
'./worker-node'
56 checkValidTasksQueueOptions
,
57 checkValidWorkerChoiceStrategy
,
58 updateMeasurementStatistics
62 * Base class that implements some shared logic for all poolifier pools.
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
68 export abstract class AbstractPool
<
69 Worker
extends IWorker
,
72 > implements IPool
<Worker
, Data
, Response
> {
74 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
77 public emitter
?: EventEmitterAsyncResource
80 * Dynamic pool maximum size property placeholder.
82 protected readonly max
?: number
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
91 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
92 new Map
<string, PromiseResponseWrapper
<Response
>>()
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
97 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
108 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
111 * Whether the pool is started or not.
113 private started
: boolean
115 * Whether the pool is starting or not.
117 private starting
: boolean
119 * Whether the pool is destroying or not.
121 private destroying
: boolean
123 * Whether the pool ready event has been emitted or not.
125 private readyEventEmitted
: boolean
127 * The start timestamp of the pool.
129 private readonly startTimestamp
132 * Constructs a new poolifier pool.
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
139 protected readonly numberOfWorkers
: number,
140 protected readonly filePath
: string,
141 protected readonly opts
: PoolOptions
<Worker
>
143 if (!this.isMain()) {
145 'Cannot start a pool from a worker with the same type as the pool'
148 checkFilePath(this.filePath
)
149 this.checkNumberOfWorkers(this.numberOfWorkers
)
150 this.checkPoolOptions(this.opts
)
152 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
153 this.executeTask
= this.executeTask
.bind(this)
154 this.enqueueTask
= this.enqueueTask
.bind(this)
156 if (this.opts
.enableEvents
=== true) {
157 this.initializeEventEmitter()
159 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
165 this.opts
.workerChoiceStrategy
,
166 this.opts
.workerChoiceStrategyOptions
171 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
174 this.starting
= false
175 this.destroying
= false
176 this.readyEventEmitted
= false
177 if (this.opts
.startWorkers
=== true) {
181 this.startTimestamp
= performance
.now()
184 private checkNumberOfWorkers (numberOfWorkers
: number): void {
185 if (numberOfWorkers
== null) {
187 'Cannot instantiate a pool without specifying the number of workers'
189 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
191 'Cannot instantiate a pool with a non safe integer number of workers'
193 } else if (numberOfWorkers
< 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
197 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
202 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
203 if (isPlainObject(opts
)) {
204 this.opts
.startWorkers
= opts
.startWorkers
?? true
205 checkValidWorkerChoiceStrategy(
206 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
208 this.opts
.workerChoiceStrategy
=
209 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
213 this.opts
.workerChoiceStrategyOptions
= {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
215 ...opts
.workerChoiceStrategyOptions
217 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
218 this.opts
.enableEvents
= opts
.enableEvents
?? true
219 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
220 if (this.opts
.enableTasksQueue
) {
221 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
222 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
223 opts
.tasksQueueOptions
as TasksQueueOptions
227 throw new TypeError('Invalid pool options: must be a plain object')
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
235 workerChoiceStrategyOptions
!= null &&
236 !isPlainObject(workerChoiceStrategyOptions
)
239 'Invalid worker choice strategy options: must be a plain object'
243 workerChoiceStrategyOptions
?.retries
!= null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
247 'Invalid worker choice strategy options: retries must be an integer'
251 workerChoiceStrategyOptions
?.retries
!= null &&
252 workerChoiceStrategyOptions
.retries
< 0
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
259 workerChoiceStrategyOptions
?.weights
!= null &&
260 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
263 'Invalid worker choice strategy options: must have a weight for each worker node'
267 workerChoiceStrategyOptions
?.measurement
!= null &&
268 !Object.values(Measurements
).includes(
269 workerChoiceStrategyOptions
.measurement
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
278 private initializeEventEmitter (): void {
279 this.emitter
= new EventEmitterAsyncResource({
280 name
: `poolifier:${this.type}-${this.worker}-pool`
285 public get
info (): PoolInfo
{
290 started
: this.started
,
292 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
293 minSize
: this.minSize
,
294 maxSize
: this.maxSize
,
295 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 .runTime
.aggregate
&&
297 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
299 workerNodes
: this.workerNodes
.length
,
300 idleWorkerNodes
: this.workerNodes
.reduce(
301 (accumulator
, workerNode
) =>
302 workerNode
.usage
.tasks
.executing
=== 0
307 busyWorkerNodes
: this.workerNodes
.reduce(
308 (accumulator
, workerNode
) =>
309 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
312 executedTasks
: this.workerNodes
.reduce(
313 (accumulator
, workerNode
) =>
314 accumulator
+ workerNode
.usage
.tasks
.executed
,
317 executingTasks
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 accumulator
+ workerNode
.usage
.tasks
.executing
,
322 ...(this.opts
.enableTasksQueue
=== true && {
323 queuedTasks
: this.workerNodes
.reduce(
324 (accumulator
, workerNode
) =>
325 accumulator
+ workerNode
.usage
.tasks
.queued
,
329 ...(this.opts
.enableTasksQueue
=== true && {
330 maxQueuedTasks
: this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
336 ...(this.opts
.enableTasksQueue
=== true && {
337 backPressure
: this.hasBackPressure()
339 ...(this.opts
.enableTasksQueue
=== true && {
340 stolenTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.stolen
,
346 failedTasks
: this.workerNodes
.reduce(
347 (accumulator
, workerNode
) =>
348 accumulator
+ workerNode
.usage
.tasks
.failed
,
351 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
352 .runTime
.aggregate
&& {
356 ...this.workerNodes
.map(
357 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
363 ...this.workerNodes
.map(
364 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
368 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
369 .runTime
.average
&& {
372 this.workerNodes
.reduce
<number[]>(
373 (accumulator
, workerNode
) =>
374 accumulator
.concat(workerNode
.usage
.runTime
.history
),
380 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
384 this.workerNodes
.reduce
<number[]>(
385 (accumulator
, workerNode
) =>
386 accumulator
.concat(workerNode
.usage
.runTime
.history
),
394 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
395 .waitTime
.aggregate
&& {
399 ...this.workerNodes
.map(
400 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
406 ...this.workerNodes
.map(
407 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
411 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
412 .waitTime
.average
&& {
415 this.workerNodes
.reduce
<number[]>(
416 (accumulator
, workerNode
) =>
417 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
423 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
424 .waitTime
.median
&& {
427 this.workerNodes
.reduce
<number[]>(
428 (accumulator
, workerNode
) =>
429 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
441 * The pool readiness boolean status.
443 private get
ready (): boolean {
445 this.workerNodes
.reduce(
446 (accumulator
, workerNode
) =>
447 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
456 * The approximate pool utilization.
458 * @returns The pool utilization.
460 private get
utilization (): number {
461 const poolTimeCapacity
=
462 (performance
.now() - this.startTimestamp
) * this.maxSize
463 const totalTasksRunTime
= this.workerNodes
.reduce(
464 (accumulator
, workerNode
) =>
465 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
468 const totalTasksWaitTime
= this.workerNodes
.reduce(
469 (accumulator
, workerNode
) =>
470 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
473 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
479 * If it is `'dynamic'`, it provides the `max` property.
481 protected abstract get
type (): PoolType
486 protected abstract get
worker (): WorkerType
489 * The pool minimum size.
491 protected get
minSize (): number {
492 return this.numberOfWorkers
496 * The pool maximum size.
498 protected get
maxSize (): number {
499 return this.max
?? this.numberOfWorkers
503 * Checks if the worker id sent in the received message from a worker is valid.
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
509 if (message
.workerId
== null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
513 `Worker message received from unknown worker '${message.workerId}'`
519 * Gets the given worker its worker node key.
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
524 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
525 return this.workerNodes
.findIndex(
526 workerNode
=> workerNode
.worker
=== worker
531 * Gets the worker node key given its worker id.
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
536 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
537 return this.workerNodes
.findIndex(
538 workerNode
=> workerNode
.info
.id
=== workerId
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy
: WorkerChoiceStrategy
,
545 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
548 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
549 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
550 this.opts
.workerChoiceStrategy
552 if (workerChoiceStrategyOptions
!= null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
556 workerNode
.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey
)
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
566 this.opts
.workerChoiceStrategyOptions
= {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
568 ...workerChoiceStrategyOptions
570 this.workerChoiceStrategyContext
.setOptions(
571 this.opts
.workerChoiceStrategyOptions
576 public enableTasksQueue (
578 tasksQueueOptions
?: TasksQueueOptions
580 if (this.opts
.enableTasksQueue
=== true && !enable
) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
585 this.opts
.enableTasksQueue
= enable
586 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
590 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
591 if (this.opts
.enableTasksQueue
=== true) {
592 checkValidTasksQueueOptions(tasksQueueOptions
)
593 this.opts
.tasksQueueOptions
=
594 this.buildTasksQueueOptions(tasksQueueOptions
)
595 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
596 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
597 this.setTaskStealing()
599 this.unsetTaskStealing()
601 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
602 this.setTasksStealingOnBackPressure()
604 this.unsetTasksStealingOnBackPressure()
606 } else if (this.opts
.tasksQueueOptions
!= null) {
607 delete this.opts
.tasksQueueOptions
611 private buildTasksQueueOptions (
612 tasksQueueOptions
: TasksQueueOptions
613 ): TasksQueueOptions
{
616 size
: Math.pow(this.maxSize
, 2),
619 tasksStealingOnBackPressure
: true
625 private setTasksQueueSize (size
: number): void {
626 for (const workerNode
of this.workerNodes
) {
627 workerNode
.tasksQueueBackPressureSize
= size
631 private setTaskStealing (): void {
632 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
633 this.workerNodes
[workerNodeKey
].on(
635 this.handleIdleWorkerNodeEvent
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
642 this.workerNodes
[workerNodeKey
].off(
644 this.handleIdleWorkerNodeEvent
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
651 this.workerNodes
[workerNodeKey
].on(
653 this.handleBackPressureEvent
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
660 this.workerNodes
[workerNodeKey
].off(
662 this.handleBackPressureEvent
668 * Whether the pool is full or not.
670 * The pool filling boolean status.
672 protected get
full (): boolean {
673 return this.workerNodes
.length
>= this.maxSize
677 * Whether the pool is busy or not.
679 * The pool busyness boolean status.
681 protected abstract get
busy (): boolean
684 * Whether worker nodes are executing concurrently their tasks quota or not.
686 * @returns Worker nodes busyness boolean status.
688 protected internalBusy (): boolean {
689 if (this.opts
.enableTasksQueue
=== true) {
691 this.workerNodes
.findIndex(
693 workerNode
.info
.ready
&&
694 workerNode
.usage
.tasks
.executing
<
695 (this.opts
.tasksQueueOptions
?.concurrency
as number)
700 this.workerNodes
.findIndex(
702 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey
: number,
709 message
: MessageValue
<Data
>
710 ): Promise
<boolean> {
711 return await new Promise
<boolean>((resolve
, reject
) => {
712 const taskFunctionOperationListener
= (
713 message
: MessageValue
<Response
>
715 this.checkMessageWorkerId(message
)
716 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
718 message
.taskFunctionOperationStatus
!= null &&
719 message
.workerId
=== workerId
721 if (message
.taskFunctionOperationStatus
) {
723 } else if (!message
.taskFunctionOperationStatus
) {
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
736 taskFunctionOperationListener
740 this.registerWorkerMessageListener(
742 taskFunctionOperationListener
744 this.sendToWorker(workerNodeKey
, message
)
748 private async sendTaskFunctionOperationToWorkers (
749 message
: MessageValue
<Data
>
750 ): Promise
<boolean> {
751 return await new Promise
<boolean>((resolve
, reject
) => {
752 const responsesReceived
= new Array<MessageValue
<Response
>>()
753 const taskFunctionOperationsListener
= (
754 message
: MessageValue
<Response
>
756 this.checkMessageWorkerId(message
)
757 if (message
.taskFunctionOperationStatus
!= null) {
758 responsesReceived
.push(message
)
759 if (responsesReceived
.length
=== this.workerNodes
.length
) {
761 responsesReceived
.every(
762 message
=> message
.taskFunctionOperationStatus
=== true
767 responsesReceived
.some(
768 message
=> message
.taskFunctionOperationStatus
=== false
771 const errorResponse
= responsesReceived
.find(
772 response
=> response
.taskFunctionOperationStatus
=== false
776 `Task function operation '${
777 message.taskFunctionOperation as string
778 }' failed on worker ${
779 errorResponse?.workerId as number
781 errorResponse?.workerError?.message as string
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
788 taskFunctionOperationsListener
793 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
794 this.registerWorkerMessageListener(
796 taskFunctionOperationsListener
798 this.sendToWorker(workerNodeKey
, message
)
804 public hasTaskFunction (name
: string): boolean {
805 for (const workerNode
of this.workerNodes
) {
807 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
808 workerNode
.info
.taskFunctionNames
.includes(name
)
817 public async addTaskFunction (
819 fn
: TaskFunction
<Data
, Response
>
820 ): Promise
<boolean> {
821 if (typeof name
!== 'string') {
822 throw new TypeError('name argument must be a string')
824 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
825 throw new TypeError('name argument must not be an empty string')
827 if (typeof fn
!== 'function') {
828 throw new TypeError('fn argument must be a function')
830 const opResult
= await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation
: 'add',
832 taskFunctionName
: name
,
833 taskFunction
: fn
.toString()
835 this.taskFunctions
.set(name
, fn
)
840 public async removeTaskFunction (name
: string): Promise
<boolean> {
841 if (!this.taskFunctions
.has(name
)) {
843 'Cannot remove a task function not handled on the pool side'
846 const opResult
= await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation
: 'remove',
848 taskFunctionName
: name
850 this.deleteTaskFunctionWorkerUsages(name
)
851 this.taskFunctions
.delete(name
)
856 public listTaskFunctionNames (): string[] {
857 for (const workerNode
of this.workerNodes
) {
859 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
860 workerNode
.info
.taskFunctionNames
.length
> 0
862 return workerNode
.info
.taskFunctionNames
869 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
870 return await this.sendTaskFunctionOperationToWorkers({
871 taskFunctionOperation
: 'default',
872 taskFunctionName
: name
876 private deleteTaskFunctionWorkerUsages (name
: string): void {
877 for (const workerNode
of this.workerNodes
) {
878 workerNode
.deleteTaskFunctionWorkerUsage(name
)
882 private shallExecuteTask (workerNodeKey
: number): boolean {
884 this.tasksQueueSize(workerNodeKey
) === 0 &&
885 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
886 (this.opts
.tasksQueueOptions
?.concurrency
as number)
891 public async execute (
894 transferList
?: TransferListItem
[]
895 ): Promise
<Response
> {
896 return await new Promise
<Response
>((resolve
, reject
) => {
898 reject(new Error('Cannot execute a task on not started pool'))
901 if (this.destroying
) {
902 reject(new Error('Cannot execute a task on destroying pool'))
905 if (name
!= null && typeof name
!== 'string') {
906 reject(new TypeError('name argument must be a string'))
911 typeof name
=== 'string' &&
912 name
.trim().length
=== 0
914 reject(new TypeError('name argument must not be an empty string'))
917 if (transferList
!= null && !Array.isArray(transferList
)) {
918 reject(new TypeError('transferList argument must be an array'))
921 const timestamp
= performance
.now()
922 const workerNodeKey
= this.chooseWorkerNode()
923 const task
: Task
<Data
> = {
924 name
: name
?? DEFAULT_TASK_NAME
,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data
: data
?? ({} as Data
),
931 this.promiseResponseMap
.set(task
.taskId
as string, {
937 this.opts
.enableTasksQueue
=== false ||
938 (this.opts
.enableTasksQueue
=== true &&
939 this.shallExecuteTask(workerNodeKey
))
941 this.executeTask(workerNodeKey
, task
)
943 this.enqueueTask(workerNodeKey
, task
)
949 public start (): void {
951 throw new Error('Cannot start an already started pool')
954 throw new Error('Cannot start an already starting pool')
956 if (this.destroying
) {
957 throw new Error('Cannot start a destroying pool')
961 this.workerNodes
.reduce(
962 (accumulator
, workerNode
) =>
963 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
965 ) < this.numberOfWorkers
967 this.createAndSetupWorkerNode()
969 this.starting
= false
974 public async destroy (): Promise
<void> {
976 throw new Error('Cannot destroy an already destroyed pool')
979 throw new Error('Cannot destroy an starting pool')
981 if (this.destroying
) {
982 throw new Error('Cannot destroy an already destroying pool')
984 this.destroying
= true
986 this.workerNodes
.map(async (_
, workerNodeKey
) => {
987 await this.destroyWorkerNode(workerNodeKey
)
990 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
991 this.emitter
?.emitDestroy()
992 this.emitter
?.removeAllListeners()
993 this.readyEventEmitted
= false
994 this.destroying
= false
998 protected async sendKillMessageToWorker (
999 workerNodeKey
: number
1001 await new Promise
<void>((resolve
, reject
) => {
1002 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1003 this.checkMessageWorkerId(message
)
1004 if (message
.kill
=== 'success') {
1006 } else if (message
.kill
=== 'failure') {
1009 `Kill message handling failed on worker ${
1010 message.workerId as number
1016 // FIXME: should be registered only once
1017 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1018 this.sendToWorker(workerNodeKey
, { kill
: true })
1023 * Terminates the worker node given its worker node key.
1025 * @param workerNodeKey - The worker node key.
1027 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1030 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1031 * Can be overridden.
1035 protected setupHook (): void {
1036 /* Intentionally empty */
1040 * Should return whether the worker is the main worker or not.
1042 protected abstract isMain (): boolean
1045 * Hook executed before the worker task execution.
1046 * Can be overridden.
1048 * @param workerNodeKey - The worker node key.
1049 * @param task - The task to execute.
1051 protected beforeTaskExecutionHook (
1052 workerNodeKey
: number,
1055 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1056 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1057 ++workerUsage
.tasks
.executing
1058 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1062 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1066 const taskFunctionWorkerUsage
= this.workerNodes
[
1068 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1069 ++taskFunctionWorkerUsage
.tasks
.executing
1070 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1075 * Hook executed after the worker task execution.
1076 * Can be overridden.
1078 * @param workerNodeKey - The worker node key.
1079 * @param message - The received message.
1081 protected afterTaskExecutionHook (
1082 workerNodeKey
: number,
1083 message
: MessageValue
<Response
>
1085 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1086 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1087 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1088 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1089 this.updateEluWorkerUsage(workerUsage
, message
)
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1093 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1094 message
.taskPerformance
?.name
as string
1097 const taskFunctionWorkerUsage
= this.workerNodes
[
1099 ].getTaskFunctionWorkerUsage(
1100 message
.taskPerformance
?.name
as string
1102 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1103 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1104 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1109 * Whether the worker node shall update its task function worker usage or not.
1111 * @param workerNodeKey - The worker node key.
1112 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1114 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1115 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1117 workerInfo
!= null &&
1118 Array.isArray(workerInfo
.taskFunctionNames
) &&
1119 workerInfo
.taskFunctionNames
.length
> 2
1123 private updateTaskStatisticsWorkerUsage (
1124 workerUsage
: WorkerUsage
,
1125 message
: MessageValue
<Response
>
1127 const workerTaskStatistics
= workerUsage
.tasks
1129 workerTaskStatistics
.executing
!= null &&
1130 workerTaskStatistics
.executing
> 0
1132 --workerTaskStatistics
.executing
1134 if (message
.workerError
== null) {
1135 ++workerTaskStatistics
.executed
1137 ++workerTaskStatistics
.failed
1141 private updateRunTimeWorkerUsage (
1142 workerUsage
: WorkerUsage
,
1143 message
: MessageValue
<Response
>
1145 if (message
.workerError
!= null) {
1148 updateMeasurementStatistics(
1149 workerUsage
.runTime
,
1150 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1151 message
.taskPerformance
?.runTime
?? 0
1155 private updateWaitTimeWorkerUsage (
1156 workerUsage
: WorkerUsage
,
1159 const timestamp
= performance
.now()
1160 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1161 updateMeasurementStatistics(
1162 workerUsage
.waitTime
,
1163 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1168 private updateEluWorkerUsage (
1169 workerUsage
: WorkerUsage
,
1170 message
: MessageValue
<Response
>
1172 if (message
.workerError
!= null) {
1175 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1176 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1177 updateMeasurementStatistics(
1178 workerUsage
.elu
.active
,
1179 eluTaskStatisticsRequirements
,
1180 message
.taskPerformance
?.elu
?.active
?? 0
1182 updateMeasurementStatistics(
1183 workerUsage
.elu
.idle
,
1184 eluTaskStatisticsRequirements
,
1185 message
.taskPerformance
?.elu
?.idle
?? 0
1187 if (eluTaskStatisticsRequirements
.aggregate
) {
1188 if (message
.taskPerformance
?.elu
!= null) {
1189 if (workerUsage
.elu
.utilization
!= null) {
1190 workerUsage
.elu
.utilization
=
1191 (workerUsage
.elu
.utilization
+
1192 message
.taskPerformance
.elu
.utilization
) /
1195 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1202 * Chooses a worker node for the next task.
1204 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1206 * @returns The chosen worker node key
1208 private chooseWorkerNode (): number {
1209 if (this.shallCreateDynamicWorker()) {
1210 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1212 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1214 return workerNodeKey
1217 return this.workerChoiceStrategyContext
.execute()
1221 * Conditions for dynamic worker creation.
1223 * @returns Whether to create a dynamic worker or not.
1225 private shallCreateDynamicWorker (): boolean {
1226 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1230 * Sends a message to worker given its worker node key.
1232 * @param workerNodeKey - The worker node key.
1233 * @param message - The message.
1234 * @param transferList - The optional array of transferable objects.
1236 protected abstract sendToWorker (
1237 workerNodeKey
: number,
1238 message
: MessageValue
<Data
>,
1239 transferList
?: TransferListItem
[]
1243 * Creates a new worker.
1245 * @returns Newly created worker.
1247 protected abstract createWorker (): Worker
1250 * Creates a new, completely set up worker node.
1252 * @returns New, completely set up worker node key.
1254 protected createAndSetupWorkerNode (): number {
1255 const worker
= this.createWorker()
1257 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1258 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1259 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1260 worker
.on('error', error
=> {
1261 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1262 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1263 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1264 this.emitter
?.emit(PoolEvents
.error
, error
)
1265 this.workerNodes
[workerNodeKey
].closeChannel()
1270 this.opts
.restartWorkerOnError
=== true
1272 if (workerInfo
.dynamic
) {
1273 this.createAndSetupDynamicWorkerNode()
1275 this.createAndSetupWorkerNode()
1278 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1279 this.redistributeQueuedTasks(workerNodeKey
)
1282 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1283 worker
.once('exit', () => {
1284 this.removeWorkerNode(worker
)
1287 const workerNodeKey
= this.addWorkerNode(worker
)
1289 this.afterWorkerNodeSetup(workerNodeKey
)
1291 return workerNodeKey
1295 * Creates a new, completely set up dynamic worker node.
1297 * @returns New, completely set up dynamic worker node key.
1299 protected createAndSetupDynamicWorkerNode (): number {
1300 const workerNodeKey
= this.createAndSetupWorkerNode()
1301 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1302 this.checkMessageWorkerId(message
)
1303 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1306 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1307 // Kill message received from worker
1309 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1310 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1311 ((this.opts
.enableTasksQueue
=== false &&
1312 workerUsage
.tasks
.executing
=== 0) ||
1313 (this.opts
.enableTasksQueue
=== true &&
1314 workerUsage
.tasks
.executing
=== 0 &&
1315 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1317 // Flag the worker node as not ready immediately
1318 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1319 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1320 this.emitter
?.emit(PoolEvents
.error
, error
)
1324 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1325 this.sendToWorker(workerNodeKey
, {
1328 if (this.taskFunctions
.size
> 0) {
1329 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1330 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1331 taskFunctionOperation
: 'add',
1333 taskFunction
: taskFunction
.toString()
1335 this.emitter
?.emit(PoolEvents
.error
, error
)
1339 workerInfo
.dynamic
= true
1341 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1342 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1344 workerInfo
.ready
= true
1346 this.checkAndEmitDynamicWorkerCreationEvents()
1347 return workerNodeKey
1351 * Registers a listener callback on the worker given its worker node key.
1353 * @param workerNodeKey - The worker node key.
1354 * @param listener - The message listener callback.
1356 protected abstract registerWorkerMessageListener
<
1357 Message
extends Data
| Response
1359 workerNodeKey
: number,
1360 listener
: (message
: MessageValue
<Message
>) => void
1364 * Registers once a listener callback on the worker given its worker node key.
1366 * @param workerNodeKey - The worker node key.
1367 * @param listener - The message listener callback.
1369 protected abstract registerOnceWorkerMessageListener
<
1370 Message
extends Data
| Response
1372 workerNodeKey
: number,
1373 listener
: (message
: MessageValue
<Message
>) => void
1377 * Deregisters a listener callback on the worker given its worker node key.
1379 * @param workerNodeKey - The worker node key.
1380 * @param listener - The message listener callback.
1382 protected abstract deregisterWorkerMessageListener
<
1383 Message
extends Data
| Response
1385 workerNodeKey
: number,
1386 listener
: (message
: MessageValue
<Message
>) => void
1390 * Method hooked up after a worker node has been newly created.
1391 * Can be overridden.
1393 * @param workerNodeKey - The newly created worker node key.
1395 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1396 // Listen to worker messages.
1397 this.registerWorkerMessageListener(
1399 this.workerMessageListener
.bind(this)
1401 // Send the startup message to worker.
1402 this.sendStartupMessageToWorker(workerNodeKey
)
1403 // Send the statistics message to worker.
1404 this.sendStatisticsMessageToWorker(workerNodeKey
)
1405 if (this.opts
.enableTasksQueue
=== true) {
1406 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1407 this.workerNodes
[workerNodeKey
].on(
1409 this.handleIdleWorkerNodeEvent
1412 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1413 this.workerNodes
[workerNodeKey
].on(
1415 this.handleBackPressureEvent
1422 * Sends the startup message to worker given its worker node key.
1424 * @param workerNodeKey - The worker node key.
1426 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1429 * Sends the statistics message to worker given its worker node key.
1431 * @param workerNodeKey - The worker node key.
1433 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1434 this.sendToWorker(workerNodeKey
, {
1437 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1439 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1445 private redistributeQueuedTasks (workerNodeKey
: number): void {
1446 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1447 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1448 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1449 return workerNode
.info
.ready
&&
1450 workerNode
.usage
.tasks
.queued
<
1451 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1457 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1458 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1459 this.executeTask(destinationWorkerNodeKey
, task
)
1461 this.enqueueTask(destinationWorkerNodeKey
, task
)
1466 private updateTaskStolenStatisticsWorkerUsage (
1467 workerNodeKey
: number,
1470 const workerNode
= this.workerNodes
[workerNodeKey
]
1471 if (workerNode
?.usage
!= null) {
1472 ++workerNode
.usage
.tasks
.stolen
1475 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1476 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1478 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1481 ++taskFunctionWorkerUsage
.tasks
.stolen
1485 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1486 workerNodeKey
: number
1488 const workerNode
= this.workerNodes
[workerNodeKey
]
1489 if (workerNode
?.usage
!= null) {
1490 ++workerNode
.usage
.tasks
.sequentiallyStolen
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey
: number,
1498 const workerNode
= this.workerNodes
[workerNodeKey
]
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1501 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1503 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1506 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1511 workerNodeKey
: number
1513 const workerNode
= this.workerNodes
[workerNodeKey
]
1514 if (workerNode
?.usage
!= null) {
1515 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1519 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1520 workerNodeKey
: number,
1523 const workerNode
= this.workerNodes
[workerNodeKey
]
1525 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1526 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1528 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1531 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1535 private readonly handleIdleWorkerNodeEvent
= (
1536 eventDetail
: WorkerNodeEventDetail
,
1537 previousStolenTask
?: Task
<Data
>
1539 const { workerNodeKey
} = eventDetail
1540 if (workerNodeKey
== null) {
1542 'WorkerNode event detail workerNodeKey attribute must be defined'
1545 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1547 previousStolenTask
!= null &&
1548 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1549 (workerNodeTasksUsage
.executing
> 0 ||
1550 this.tasksQueueSize(workerNodeKey
) > 0)
1552 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1553 .taskFunctionNames
as string[]) {
1554 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1559 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1562 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1564 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1567 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1569 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1570 ?.tasks
as TaskStatistics
1572 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1573 (previousStolenTask
!= null &&
1574 previousStolenTask
.name
=== stolenTask
.name
&&
1575 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1577 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1579 stolenTask
.name
as string
1582 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1584 stolenTask
.name
as string
1588 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1590 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1593 .catch(EMPTY_FUNCTION
)
1596 private readonly workerNodeStealTask
= (
1597 workerNodeKey
: number
1598 ): Task
<Data
> | undefined => {
1599 const workerNodes
= this.workerNodes
1602 (workerNodeA
, workerNodeB
) =>
1603 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1605 const sourceWorkerNode
= workerNodes
.find(
1606 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1607 sourceWorkerNode
.info
.ready
&&
1608 sourceWorkerNodeKey
!== workerNodeKey
&&
1609 sourceWorkerNode
.usage
.tasks
.queued
> 0
1611 if (sourceWorkerNode
!= null) {
1612 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1613 if (this.shallExecuteTask(workerNodeKey
)) {
1614 this.executeTask(workerNodeKey
, task
)
1616 this.enqueueTask(workerNodeKey
, task
)
1618 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1619 this.updateTaskStolenStatisticsWorkerUsage(
1627 private readonly handleBackPressureEvent
= (
1628 eventDetail
: WorkerNodeEventDetail
1630 const { workerId
} = eventDetail
1631 const sizeOffset
= 1
1632 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1635 const sourceWorkerNode
=
1636 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1637 const workerNodes
= this.workerNodes
1640 (workerNodeA
, workerNodeB
) =>
1641 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1643 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1645 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1646 workerNode
.info
.ready
&&
1647 workerNode
.info
.id
!== workerId
&&
1648 workerNode
.usage
.tasks
.queued
<
1649 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1651 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1652 if (this.shallExecuteTask(workerNodeKey
)) {
1653 this.executeTask(workerNodeKey
, task
)
1655 this.enqueueTask(workerNodeKey
, task
)
1657 this.updateTaskStolenStatisticsWorkerUsage(
1666 * This method is the message listener registered on each worker.
1668 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1669 this.checkMessageWorkerId(message
)
1670 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1671 if (ready
!= null && taskFunctionNames
!= null) {
1672 // Worker ready response received from worker
1673 this.handleWorkerReadyResponse(message
)
1674 } else if (taskId
!= null) {
1675 // Task execution response received from worker
1676 this.handleTaskExecutionResponse(message
)
1677 } else if (taskFunctionNames
!= null) {
1678 // Task function names message received from worker
1680 this.getWorkerNodeKeyByWorkerId(workerId
)
1681 ).taskFunctionNames
= taskFunctionNames
1685 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1686 const { workerId
, ready
, taskFunctionNames
} = message
1687 if (ready
=== false) {
1688 throw new Error(`Worker ${workerId as number} failed to initialize`)
1690 const workerInfo
= this.getWorkerInfo(
1691 this.getWorkerNodeKeyByWorkerId(workerId
)
1693 workerInfo
.ready
= ready
as boolean
1694 workerInfo
.taskFunctionNames
= taskFunctionNames
1695 if (!this.readyEventEmitted
&& this.ready
) {
1696 this.readyEventEmitted
= true
1697 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1701 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1702 const { workerId
, taskId
, workerError
, data
} = message
1703 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1704 if (promiseResponse
!= null) {
1705 const { resolve
, reject
, workerNodeKey
} = promiseResponse
1706 if (workerError
!= null) {
1707 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1708 reject(workerError
.message
)
1710 resolve(data
as Response
)
1712 this.afterTaskExecutionHook(workerNodeKey
, message
)
1713 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1714 this.promiseResponseMap
.delete(taskId
as string)
1715 if (this.opts
.enableTasksQueue
=== true) {
1716 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1718 this.tasksQueueSize(workerNodeKey
) > 0 &&
1719 workerNodeTasksUsage
.executing
<
1720 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1724 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1728 workerNodeTasksUsage
.executing
=== 0 &&
1729 this.tasksQueueSize(workerNodeKey
) === 0 &&
1730 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1732 this.workerNodes
[workerNodeKey
].emit('idleWorkerNode', {
1733 workerId
: workerId
as number,
1741 private checkAndEmitTaskExecutionEvents (): void {
1743 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1747 private checkAndEmitTaskQueuingEvents (): void {
1748 if (this.hasBackPressure()) {
1749 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1753 private checkAndEmitDynamicWorkerCreationEvents (): void {
1754 if (this.type === PoolTypes
.dynamic
) {
1756 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1762 * Gets the worker information given its worker node key.
1764 * @param workerNodeKey - The worker node key.
1765 * @returns The worker information.
1767 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1768 return this.workerNodes
[workerNodeKey
]?.info
1772 * Adds the given worker in the pool worker nodes.
1774 * @param worker - The worker.
1775 * @returns The added worker node key.
1776 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1778 private addWorkerNode (worker
: Worker
): number {
1779 const workerNode
= new WorkerNode
<Worker
, Data
>(
1781 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1783 // Flag the worker node as ready at pool startup.
1784 if (this.starting
) {
1785 workerNode
.info
.ready
= true
1787 this.workerNodes
.push(workerNode
)
1788 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1789 if (workerNodeKey
=== -1) {
1790 throw new Error('Worker added not found in worker nodes')
1792 return workerNodeKey
1796 * Removes the given worker from the pool worker nodes.
1798 * @param worker - The worker.
1800 private removeWorkerNode (worker
: Worker
): void {
1801 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1802 if (workerNodeKey
!== -1) {
1803 this.workerNodes
.splice(workerNodeKey
, 1)
1804 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1808 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1809 this.getWorkerInfo(workerNodeKey
).ready
= false
1813 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1815 this.opts
.enableTasksQueue
=== true &&
1816 this.workerNodes
[workerNodeKey
].hasBackPressure()
1820 private hasBackPressure (): boolean {
1822 this.opts
.enableTasksQueue
=== true &&
1823 this.workerNodes
.findIndex(
1824 workerNode
=> !workerNode
.hasBackPressure()
1830 * Executes the given task on the worker given its worker node key.
1832 * @param workerNodeKey - The worker node key.
1833 * @param task - The task to execute.
1835 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1836 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1837 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1838 this.checkAndEmitTaskExecutionEvents()
1841 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1842 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1843 this.checkAndEmitTaskQueuingEvents()
1844 return tasksQueueSize
1847 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1848 return this.workerNodes
[workerNodeKey
].dequeueTask()
1851 private tasksQueueSize (workerNodeKey
: number): number {
1852 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1855 protected flushTasksQueue (workerNodeKey
: number): void {
1856 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1859 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1862 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1867 this.flushTasksQueue(workerNodeKey
)