1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
24 import { KillBehaviors
} from
'../worker/worker-options'
25 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
40 WorkerNodeEventDetail
,
45 type MeasurementStatisticsRequirements
,
47 WorkerChoiceStrategies
,
48 type WorkerChoiceStrategy
,
49 type WorkerChoiceStrategyOptions
50 } from
'./selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
52 import { version
} from
'./version'
53 import { WorkerNode
} from
'./worker-node'
56 checkValidTasksQueueOptions
,
57 checkValidWorkerChoiceStrategy
,
58 updateMeasurementStatistics
62 * Base class that implements some shared logic for all poolifier pools.
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
68 export abstract class AbstractPool
<
69 Worker
extends IWorker
,
72 > implements IPool
<Worker
, Data
, Response
> {
74 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
77 public emitter
?: EventEmitterAsyncResource
80 * Dynamic pool maximum size property placeholder.
82 protected readonly max
?: number
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
91 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
92 new Map
<string, PromiseResponseWrapper
<Response
>>()
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
97 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
108 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
111 * Whether the pool is started or not.
113 private started
: boolean
115 * Whether the pool is starting or not.
117 private starting
: boolean
119 * Whether the pool is destroying or not.
121 private destroying
: boolean
123 * Whether the pool ready event has been emitted or not.
125 private readyEventEmitted
: boolean
127 * The start timestamp of the pool.
129 private readonly startTimestamp
132 * Constructs a new poolifier pool.
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
139 protected readonly numberOfWorkers
: number,
140 protected readonly filePath
: string,
141 protected readonly opts
: PoolOptions
<Worker
>
143 if (!this.isMain()) {
145 'Cannot start a pool from a worker with the same type as the pool'
148 checkFilePath(this.filePath
)
149 this.checkNumberOfWorkers(this.numberOfWorkers
)
150 this.checkPoolOptions(this.opts
)
152 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
153 this.executeTask
= this.executeTask
.bind(this)
154 this.enqueueTask
= this.enqueueTask
.bind(this)
156 if (this.opts
.enableEvents
=== true) {
157 this.initializeEventEmitter()
159 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
165 this.opts
.workerChoiceStrategy
,
166 this.opts
.workerChoiceStrategyOptions
171 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
174 this.starting
= false
175 this.destroying
= false
176 this.readyEventEmitted
= false
177 if (this.opts
.startWorkers
=== true) {
181 this.startTimestamp
= performance
.now()
184 private checkNumberOfWorkers (numberOfWorkers
: number): void {
185 if (numberOfWorkers
== null) {
187 'Cannot instantiate a pool without specifying the number of workers'
189 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
191 'Cannot instantiate a pool with a non safe integer number of workers'
193 } else if (numberOfWorkers
< 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
197 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
202 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
203 if (isPlainObject(opts
)) {
204 this.opts
.startWorkers
= opts
.startWorkers
?? true
205 checkValidWorkerChoiceStrategy(
206 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
208 this.opts
.workerChoiceStrategy
=
209 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
213 this.opts
.workerChoiceStrategyOptions
= {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
215 ...opts
.workerChoiceStrategyOptions
217 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
218 this.opts
.enableEvents
= opts
.enableEvents
?? true
219 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
220 if (this.opts
.enableTasksQueue
) {
221 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
222 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
223 opts
.tasksQueueOptions
as TasksQueueOptions
227 throw new TypeError('Invalid pool options: must be a plain object')
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
235 workerChoiceStrategyOptions
!= null &&
236 !isPlainObject(workerChoiceStrategyOptions
)
239 'Invalid worker choice strategy options: must be a plain object'
243 workerChoiceStrategyOptions
?.retries
!= null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
247 'Invalid worker choice strategy options: retries must be an integer'
251 workerChoiceStrategyOptions
?.retries
!= null &&
252 workerChoiceStrategyOptions
.retries
< 0
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
259 workerChoiceStrategyOptions
?.weights
!= null &&
260 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
263 'Invalid worker choice strategy options: must have a weight for each worker node'
267 workerChoiceStrategyOptions
?.measurement
!= null &&
268 !Object.values(Measurements
).includes(
269 workerChoiceStrategyOptions
.measurement
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
278 private initializeEventEmitter (): void {
279 this.emitter
= new EventEmitterAsyncResource({
280 name
: `poolifier:${this.type}-${this.worker}-pool`
285 public get
info (): PoolInfo
{
290 started
: this.started
,
292 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
293 minSize
: this.minSize
,
294 maxSize
: this.maxSize
,
295 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 .runTime
.aggregate
&&
297 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
299 workerNodes
: this.workerNodes
.length
,
300 idleWorkerNodes
: this.workerNodes
.reduce(
301 (accumulator
, workerNode
) =>
302 workerNode
.usage
.tasks
.executing
=== 0
307 busyWorkerNodes
: this.workerNodes
.reduce(
308 (accumulator
, workerNode
) =>
309 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
312 executedTasks
: this.workerNodes
.reduce(
313 (accumulator
, workerNode
) =>
314 accumulator
+ workerNode
.usage
.tasks
.executed
,
317 executingTasks
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 accumulator
+ workerNode
.usage
.tasks
.executing
,
322 ...(this.opts
.enableTasksQueue
=== true && {
323 queuedTasks
: this.workerNodes
.reduce(
324 (accumulator
, workerNode
) =>
325 accumulator
+ workerNode
.usage
.tasks
.queued
,
329 ...(this.opts
.enableTasksQueue
=== true && {
330 maxQueuedTasks
: this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
336 ...(this.opts
.enableTasksQueue
=== true && {
337 backPressure
: this.hasBackPressure()
339 ...(this.opts
.enableTasksQueue
=== true && {
340 stolenTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.stolen
,
346 failedTasks
: this.workerNodes
.reduce(
347 (accumulator
, workerNode
) =>
348 accumulator
+ workerNode
.usage
.tasks
.failed
,
351 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
352 .runTime
.aggregate
&& {
356 ...this.workerNodes
.map(
357 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
363 ...this.workerNodes
.map(
364 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
368 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
369 .runTime
.average
&& {
372 this.workerNodes
.reduce
<number[]>(
373 (accumulator
, workerNode
) =>
374 accumulator
.concat(workerNode
.usage
.runTime
.history
),
380 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
384 this.workerNodes
.reduce
<number[]>(
385 (accumulator
, workerNode
) =>
386 accumulator
.concat(workerNode
.usage
.runTime
.history
),
394 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
395 .waitTime
.aggregate
&& {
399 ...this.workerNodes
.map(
400 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
406 ...this.workerNodes
.map(
407 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
411 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
412 .waitTime
.average
&& {
415 this.workerNodes
.reduce
<number[]>(
416 (accumulator
, workerNode
) =>
417 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
423 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
424 .waitTime
.median
&& {
427 this.workerNodes
.reduce
<number[]>(
428 (accumulator
, workerNode
) =>
429 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
441 * The pool readiness boolean status.
443 private get
ready (): boolean {
445 this.workerNodes
.reduce(
446 (accumulator
, workerNode
) =>
447 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
456 * The approximate pool utilization.
458 * @returns The pool utilization.
460 private get
utilization (): number {
461 const poolTimeCapacity
=
462 (performance
.now() - this.startTimestamp
) * this.maxSize
463 const totalTasksRunTime
= this.workerNodes
.reduce(
464 (accumulator
, workerNode
) =>
465 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
468 const totalTasksWaitTime
= this.workerNodes
.reduce(
469 (accumulator
, workerNode
) =>
470 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
473 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
479 * If it is `'dynamic'`, it provides the `max` property.
481 protected abstract get
type (): PoolType
486 protected abstract get
worker (): WorkerType
489 * The pool minimum size.
491 protected get
minSize (): number {
492 return this.numberOfWorkers
496 * The pool maximum size.
498 protected get
maxSize (): number {
499 return this.max
?? this.numberOfWorkers
503 * Checks if the worker id sent in the received message from a worker is valid.
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
509 if (message
.workerId
== null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
513 `Worker message received from unknown worker '${message.workerId}'`
519 * Gets the given worker its worker node key.
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
524 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
525 return this.workerNodes
.findIndex(
526 workerNode
=> workerNode
.worker
=== worker
531 * Gets the worker node key given its worker id.
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
536 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
537 return this.workerNodes
.findIndex(
538 workerNode
=> workerNode
.info
.id
=== workerId
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy
: WorkerChoiceStrategy
,
545 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
548 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
549 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
550 this.opts
.workerChoiceStrategy
552 if (workerChoiceStrategyOptions
!= null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
556 workerNode
.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey
)
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
566 this.opts
.workerChoiceStrategyOptions
= {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
568 ...workerChoiceStrategyOptions
570 this.workerChoiceStrategyContext
.setOptions(
571 this.opts
.workerChoiceStrategyOptions
576 public enableTasksQueue (
578 tasksQueueOptions
?: TasksQueueOptions
580 if (this.opts
.enableTasksQueue
=== true && !enable
) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
585 this.opts
.enableTasksQueue
= enable
586 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
590 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
591 if (this.opts
.enableTasksQueue
=== true) {
592 checkValidTasksQueueOptions(tasksQueueOptions
)
593 this.opts
.tasksQueueOptions
=
594 this.buildTasksQueueOptions(tasksQueueOptions
)
595 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
596 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
597 this.unsetTaskStealing()
598 this.setTaskStealing()
600 this.unsetTaskStealing()
602 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
603 this.unsetTasksStealingOnBackPressure()
604 this.setTasksStealingOnBackPressure()
606 this.unsetTasksStealingOnBackPressure()
608 } else if (this.opts
.tasksQueueOptions
!= null) {
609 delete this.opts
.tasksQueueOptions
613 private buildTasksQueueOptions (
614 tasksQueueOptions
: TasksQueueOptions
615 ): TasksQueueOptions
{
618 size
: Math.pow(this.maxSize
, 2),
621 tasksStealingOnBackPressure
: true
627 private setTasksQueueSize (size
: number): void {
628 for (const workerNode
of this.workerNodes
) {
629 workerNode
.tasksQueueBackPressureSize
= size
633 private setTaskStealing (): void {
634 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
635 this.workerNodes
[workerNodeKey
].on(
637 this.handleIdleWorkerNodeEvent
642 private unsetTaskStealing (): void {
643 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
644 this.workerNodes
[workerNodeKey
].off(
646 this.handleIdleWorkerNodeEvent
651 private setTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
653 this.workerNodes
[workerNodeKey
].on(
655 this.handleBackPressureEvent
660 private unsetTasksStealingOnBackPressure (): void {
661 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
662 this.workerNodes
[workerNodeKey
].off(
664 this.handleBackPressureEvent
670 * Whether the pool is full or not.
672 * The pool filling boolean status.
674 protected get
full (): boolean {
675 return this.workerNodes
.length
>= this.maxSize
679 * Whether the pool is busy or not.
681 * The pool busyness boolean status.
683 protected abstract get
busy (): boolean
686 * Whether worker nodes are executing concurrently their tasks quota or not.
688 * @returns Worker nodes busyness boolean status.
690 protected internalBusy (): boolean {
691 if (this.opts
.enableTasksQueue
=== true) {
693 this.workerNodes
.findIndex(
695 workerNode
.info
.ready
&&
696 workerNode
.usage
.tasks
.executing
<
697 (this.opts
.tasksQueueOptions
?.concurrency
as number)
702 this.workerNodes
.findIndex(
704 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
709 private async sendTaskFunctionOperationToWorker (
710 workerNodeKey
: number,
711 message
: MessageValue
<Data
>
712 ): Promise
<boolean> {
713 return await new Promise
<boolean>((resolve
, reject
) => {
714 const taskFunctionOperationListener
= (
715 message
: MessageValue
<Response
>
717 this.checkMessageWorkerId(message
)
718 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
720 message
.taskFunctionOperationStatus
!= null &&
721 message
.workerId
=== workerId
723 if (message
.taskFunctionOperationStatus
) {
725 } else if (!message
.taskFunctionOperationStatus
) {
728 `Task function operation '${
729 message.taskFunctionOperation as string
730 }' failed on worker ${message.workerId} with error: '${
731 message.workerError?.message as string
736 this.deregisterWorkerMessageListener(
737 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
738 taskFunctionOperationListener
742 this.registerWorkerMessageListener(
744 taskFunctionOperationListener
746 this.sendToWorker(workerNodeKey
, message
)
750 private async sendTaskFunctionOperationToWorkers (
751 message
: MessageValue
<Data
>
752 ): Promise
<boolean> {
753 return await new Promise
<boolean>((resolve
, reject
) => {
754 const responsesReceived
= new Array<MessageValue
<Response
>>()
755 const taskFunctionOperationsListener
= (
756 message
: MessageValue
<Response
>
758 this.checkMessageWorkerId(message
)
759 if (message
.taskFunctionOperationStatus
!= null) {
760 responsesReceived
.push(message
)
761 if (responsesReceived
.length
=== this.workerNodes
.length
) {
763 responsesReceived
.every(
764 message
=> message
.taskFunctionOperationStatus
=== true
769 responsesReceived
.some(
770 message
=> message
.taskFunctionOperationStatus
=== false
773 const errorResponse
= responsesReceived
.find(
774 response
=> response
.taskFunctionOperationStatus
=== false
778 `Task function operation '${
779 message.taskFunctionOperation as string
780 }' failed on worker ${
781 errorResponse?.workerId as number
783 errorResponse?.workerError?.message as string
788 this.deregisterWorkerMessageListener(
789 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
790 taskFunctionOperationsListener
795 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
796 this.registerWorkerMessageListener(
798 taskFunctionOperationsListener
800 this.sendToWorker(workerNodeKey
, message
)
806 public hasTaskFunction (name
: string): boolean {
807 for (const workerNode
of this.workerNodes
) {
809 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
810 workerNode
.info
.taskFunctionNames
.includes(name
)
819 public async addTaskFunction (
821 fn
: TaskFunction
<Data
, Response
>
822 ): Promise
<boolean> {
823 if (typeof name
!== 'string') {
824 throw new TypeError('name argument must be a string')
826 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
827 throw new TypeError('name argument must not be an empty string')
829 if (typeof fn
!== 'function') {
830 throw new TypeError('fn argument must be a function')
832 const opResult
= await this.sendTaskFunctionOperationToWorkers({
833 taskFunctionOperation
: 'add',
834 taskFunctionName
: name
,
835 taskFunction
: fn
.toString()
837 this.taskFunctions
.set(name
, fn
)
842 public async removeTaskFunction (name
: string): Promise
<boolean> {
843 if (!this.taskFunctions
.has(name
)) {
845 'Cannot remove a task function not handled on the pool side'
848 const opResult
= await this.sendTaskFunctionOperationToWorkers({
849 taskFunctionOperation
: 'remove',
850 taskFunctionName
: name
852 this.deleteTaskFunctionWorkerUsages(name
)
853 this.taskFunctions
.delete(name
)
858 public listTaskFunctionNames (): string[] {
859 for (const workerNode
of this.workerNodes
) {
861 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
862 workerNode
.info
.taskFunctionNames
.length
> 0
864 return workerNode
.info
.taskFunctionNames
871 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
872 return await this.sendTaskFunctionOperationToWorkers({
873 taskFunctionOperation
: 'default',
874 taskFunctionName
: name
878 private deleteTaskFunctionWorkerUsages (name
: string): void {
879 for (const workerNode
of this.workerNodes
) {
880 workerNode
.deleteTaskFunctionWorkerUsage(name
)
884 private shallExecuteTask (workerNodeKey
: number): boolean {
886 this.tasksQueueSize(workerNodeKey
) === 0 &&
887 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
888 (this.opts
.tasksQueueOptions
?.concurrency
as number)
893 public async execute (
896 transferList
?: TransferListItem
[]
897 ): Promise
<Response
> {
898 return await new Promise
<Response
>((resolve
, reject
) => {
900 reject(new Error('Cannot execute a task on not started pool'))
903 if (this.destroying
) {
904 reject(new Error('Cannot execute a task on destroying pool'))
907 if (name
!= null && typeof name
!== 'string') {
908 reject(new TypeError('name argument must be a string'))
913 typeof name
=== 'string' &&
914 name
.trim().length
=== 0
916 reject(new TypeError('name argument must not be an empty string'))
919 if (transferList
!= null && !Array.isArray(transferList
)) {
920 reject(new TypeError('transferList argument must be an array'))
923 const timestamp
= performance
.now()
924 const workerNodeKey
= this.chooseWorkerNode()
925 const task
: Task
<Data
> = {
926 name
: name
?? DEFAULT_TASK_NAME
,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data
: data
?? ({} as Data
),
933 this.promiseResponseMap
.set(task
.taskId
as string, {
939 this.opts
.enableTasksQueue
=== false ||
940 (this.opts
.enableTasksQueue
=== true &&
941 this.shallExecuteTask(workerNodeKey
))
943 this.executeTask(workerNodeKey
, task
)
945 this.enqueueTask(workerNodeKey
, task
)
951 public start (): void {
953 throw new Error('Cannot start an already started pool')
956 throw new Error('Cannot start an already starting pool')
958 if (this.destroying
) {
959 throw new Error('Cannot start a destroying pool')
963 this.workerNodes
.reduce(
964 (accumulator
, workerNode
) =>
965 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
967 ) < this.numberOfWorkers
969 this.createAndSetupWorkerNode()
971 this.starting
= false
976 public async destroy (): Promise
<void> {
978 throw new Error('Cannot destroy an already destroyed pool')
981 throw new Error('Cannot destroy an starting pool')
983 if (this.destroying
) {
984 throw new Error('Cannot destroy an already destroying pool')
986 this.destroying
= true
988 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
989 await this.destroyWorkerNode(workerNodeKey
)
992 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
993 this.emitter
?.emitDestroy()
994 this.emitter
?.removeAllListeners()
995 this.readyEventEmitted
= false
996 this.destroying
= false
1000 protected async sendKillMessageToWorker (
1001 workerNodeKey
: number
1003 await new Promise
<void>((resolve
, reject
) => {
1004 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1005 this.checkMessageWorkerId(message
)
1006 if (message
.kill
=== 'success') {
1008 } else if (message
.kill
=== 'failure') {
1011 `Kill message handling failed on worker ${
1012 message.workerId as number
1018 // FIXME: should be registered only once
1019 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1020 this.sendToWorker(workerNodeKey
, { kill
: true })
1025 * Terminates the worker node given its worker node key.
1027 * @param workerNodeKey - The worker node key.
1029 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1032 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1033 * Can be overridden.
1037 protected setupHook (): void {
1038 /* Intentionally empty */
1042 * Should return whether the worker is the main worker or not.
1044 protected abstract isMain (): boolean
1047 * Hook executed before the worker task execution.
1048 * Can be overridden.
1050 * @param workerNodeKey - The worker node key.
1051 * @param task - The task to execute.
1053 protected beforeTaskExecutionHook (
1054 workerNodeKey
: number,
1057 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1058 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1059 ++workerUsage
.tasks
.executing
1060 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1063 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1064 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1068 const taskFunctionWorkerUsage
= this.workerNodes
[
1070 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1071 ++taskFunctionWorkerUsage
.tasks
.executing
1072 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1077 * Hook executed after the worker task execution.
1078 * Can be overridden.
1080 * @param workerNodeKey - The worker node key.
1081 * @param message - The received message.
1083 protected afterTaskExecutionHook (
1084 workerNodeKey
: number,
1085 message
: MessageValue
<Response
>
1087 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1088 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1089 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1090 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1091 this.updateEluWorkerUsage(workerUsage
, message
)
1094 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1095 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1096 message
.taskPerformance
?.name
as string
1099 const taskFunctionWorkerUsage
= this.workerNodes
[
1101 ].getTaskFunctionWorkerUsage(
1102 message
.taskPerformance
?.name
as string
1104 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1105 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1106 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1111 * Whether the worker node shall update its task function worker usage or not.
1113 * @param workerNodeKey - The worker node key.
1114 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1116 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1117 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1119 workerInfo
!= null &&
1120 Array.isArray(workerInfo
.taskFunctionNames
) &&
1121 workerInfo
.taskFunctionNames
.length
> 2
1125 private updateTaskStatisticsWorkerUsage (
1126 workerUsage
: WorkerUsage
,
1127 message
: MessageValue
<Response
>
1129 const workerTaskStatistics
= workerUsage
.tasks
1131 workerTaskStatistics
.executing
!= null &&
1132 workerTaskStatistics
.executing
> 0
1134 --workerTaskStatistics
.executing
1136 if (message
.workerError
== null) {
1137 ++workerTaskStatistics
.executed
1139 ++workerTaskStatistics
.failed
1143 private updateRunTimeWorkerUsage (
1144 workerUsage
: WorkerUsage
,
1145 message
: MessageValue
<Response
>
1147 if (message
.workerError
!= null) {
1150 updateMeasurementStatistics(
1151 workerUsage
.runTime
,
1152 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1153 message
.taskPerformance
?.runTime
?? 0
1157 private updateWaitTimeWorkerUsage (
1158 workerUsage
: WorkerUsage
,
1161 const timestamp
= performance
.now()
1162 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1163 updateMeasurementStatistics(
1164 workerUsage
.waitTime
,
1165 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1170 private updateEluWorkerUsage (
1171 workerUsage
: WorkerUsage
,
1172 message
: MessageValue
<Response
>
1174 if (message
.workerError
!= null) {
1177 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1178 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1179 updateMeasurementStatistics(
1180 workerUsage
.elu
.active
,
1181 eluTaskStatisticsRequirements
,
1182 message
.taskPerformance
?.elu
?.active
?? 0
1184 updateMeasurementStatistics(
1185 workerUsage
.elu
.idle
,
1186 eluTaskStatisticsRequirements
,
1187 message
.taskPerformance
?.elu
?.idle
?? 0
1189 if (eluTaskStatisticsRequirements
.aggregate
) {
1190 if (message
.taskPerformance
?.elu
!= null) {
1191 if (workerUsage
.elu
.utilization
!= null) {
1192 workerUsage
.elu
.utilization
=
1193 (workerUsage
.elu
.utilization
+
1194 message
.taskPerformance
.elu
.utilization
) /
1197 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1204 * Chooses a worker node for the next task.
1206 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1208 * @returns The chosen worker node key
1210 private chooseWorkerNode (): number {
1211 if (this.shallCreateDynamicWorker()) {
1212 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1214 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1216 return workerNodeKey
1219 return this.workerChoiceStrategyContext
.execute()
1223 * Conditions for dynamic worker creation.
1225 * @returns Whether to create a dynamic worker or not.
1227 private shallCreateDynamicWorker (): boolean {
1228 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1232 * Sends a message to worker given its worker node key.
1234 * @param workerNodeKey - The worker node key.
1235 * @param message - The message.
1236 * @param transferList - The optional array of transferable objects.
1238 protected abstract sendToWorker (
1239 workerNodeKey
: number,
1240 message
: MessageValue
<Data
>,
1241 transferList
?: TransferListItem
[]
1245 * Creates a new worker.
1247 * @returns Newly created worker.
1249 protected abstract createWorker (): Worker
1252 * Creates a new, completely set up worker node.
1254 * @returns New, completely set up worker node key.
1256 protected createAndSetupWorkerNode (): number {
1257 const worker
= this.createWorker()
1259 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1260 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1261 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1262 worker
.on('error', error
=> {
1263 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1264 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1265 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1266 this.emitter
?.emit(PoolEvents
.error
, error
)
1267 this.workerNodes
[workerNodeKey
].closeChannel()
1272 this.opts
.restartWorkerOnError
=== true
1274 if (workerInfo
.dynamic
) {
1275 this.createAndSetupDynamicWorkerNode()
1277 this.createAndSetupWorkerNode()
1280 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1281 this.redistributeQueuedTasks(workerNodeKey
)
1284 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1285 worker
.once('exit', () => {
1286 this.removeWorkerNode(worker
)
1289 const workerNodeKey
= this.addWorkerNode(worker
)
1291 this.afterWorkerNodeSetup(workerNodeKey
)
1293 return workerNodeKey
1297 * Creates a new, completely set up dynamic worker node.
1299 * @returns New, completely set up dynamic worker node key.
1301 protected createAndSetupDynamicWorkerNode (): number {
1302 const workerNodeKey
= this.createAndSetupWorkerNode()
1303 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1304 this.checkMessageWorkerId(message
)
1305 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1308 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1309 // Kill message received from worker
1311 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1312 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1313 ((this.opts
.enableTasksQueue
=== false &&
1314 workerUsage
.tasks
.executing
=== 0) ||
1315 (this.opts
.enableTasksQueue
=== true &&
1316 workerUsage
.tasks
.executing
=== 0 &&
1317 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1319 // Flag the worker node as not ready immediately
1320 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1321 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1322 this.emitter
?.emit(PoolEvents
.error
, error
)
1326 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1327 this.sendToWorker(workerNodeKey
, {
1330 if (this.taskFunctions
.size
> 0) {
1331 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1332 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1333 taskFunctionOperation
: 'add',
1335 taskFunction
: taskFunction
.toString()
1337 this.emitter
?.emit(PoolEvents
.error
, error
)
1341 workerInfo
.dynamic
= true
1343 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1344 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1346 workerInfo
.ready
= true
1348 this.checkAndEmitDynamicWorkerCreationEvents()
1349 return workerNodeKey
1353 * Registers a listener callback on the worker given its worker node key.
1355 * @param workerNodeKey - The worker node key.
1356 * @param listener - The message listener callback.
1358 protected abstract registerWorkerMessageListener
<
1359 Message
extends Data
| Response
1361 workerNodeKey
: number,
1362 listener
: (message
: MessageValue
<Message
>) => void
1366 * Registers once a listener callback on the worker given its worker node key.
1368 * @param workerNodeKey - The worker node key.
1369 * @param listener - The message listener callback.
1371 protected abstract registerOnceWorkerMessageListener
<
1372 Message
extends Data
| Response
1374 workerNodeKey
: number,
1375 listener
: (message
: MessageValue
<Message
>) => void
1379 * Deregisters a listener callback on the worker given its worker node key.
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1384 protected abstract deregisterWorkerMessageListener
<
1385 Message
extends Data
| Response
1387 workerNodeKey
: number,
1388 listener
: (message
: MessageValue
<Message
>) => void
1392 * Method hooked up after a worker node has been newly created.
1393 * Can be overridden.
1395 * @param workerNodeKey - The newly created worker node key.
1397 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1398 // Listen to worker messages.
1399 this.registerWorkerMessageListener(
1401 this.workerMessageListener
1403 // Send the startup message to worker.
1404 this.sendStartupMessageToWorker(workerNodeKey
)
1405 // Send the statistics message to worker.
1406 this.sendStatisticsMessageToWorker(workerNodeKey
)
1407 if (this.opts
.enableTasksQueue
=== true) {
1408 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1409 this.workerNodes
[workerNodeKey
].on(
1411 this.handleIdleWorkerNodeEvent
1414 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1415 this.workerNodes
[workerNodeKey
].on(
1417 this.handleBackPressureEvent
1424 * Sends the startup message to worker given its worker node key.
1426 * @param workerNodeKey - The worker node key.
1428 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1431 * Sends the statistics message to worker given its worker node key.
1433 * @param workerNodeKey - The worker node key.
1435 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1436 this.sendToWorker(workerNodeKey
, {
1439 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1441 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1447 private redistributeQueuedTasks (workerNodeKey
: number): void {
1448 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1449 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1450 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1451 return workerNode
.info
.ready
&&
1452 workerNode
.usage
.tasks
.queued
<
1453 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1459 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1460 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1461 this.executeTask(destinationWorkerNodeKey
, task
)
1463 this.enqueueTask(destinationWorkerNodeKey
, task
)
1468 private updateTaskStolenStatisticsWorkerUsage (
1469 workerNodeKey
: number,
1472 const workerNode
= this.workerNodes
[workerNodeKey
]
1473 if (workerNode
?.usage
!= null) {
1474 ++workerNode
.usage
.tasks
.stolen
1477 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1478 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1480 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1483 ++taskFunctionWorkerUsage
.tasks
.stolen
1487 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1488 workerNodeKey
: number
1490 const workerNode
= this.workerNodes
[workerNodeKey
]
1491 if (workerNode
?.usage
!= null) {
1492 ++workerNode
.usage
.tasks
.sequentiallyStolen
1496 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1497 workerNodeKey
: number,
1500 const workerNode
= this.workerNodes
[workerNodeKey
]
1502 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1503 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1505 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1508 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1512 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1513 workerNodeKey
: number
1515 const workerNode
= this.workerNodes
[workerNodeKey
]
1516 if (workerNode
?.usage
!= null) {
1517 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1521 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1522 workerNodeKey
: number,
1525 const workerNode
= this.workerNodes
[workerNodeKey
]
1527 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1528 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1530 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1533 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1537 private readonly handleIdleWorkerNodeEvent
= (
1538 eventDetail
: WorkerNodeEventDetail
,
1539 previousStolenTask
?: Task
<Data
>
1541 const { workerNodeKey
} = eventDetail
1542 if (workerNodeKey
== null) {
1544 'WorkerNode event detail workerNodeKey attribute must be defined'
1547 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1549 previousStolenTask
!= null &&
1550 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1551 (workerNodeTasksUsage
.executing
> 0 ||
1552 this.tasksQueueSize(workerNodeKey
) > 0)
1554 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1555 .taskFunctionNames
as string[]) {
1556 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1561 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1564 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1566 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1569 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1571 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1572 ?.tasks
as TaskStatistics
1574 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1575 (previousStolenTask
!= null &&
1576 previousStolenTask
.name
=== stolenTask
.name
&&
1577 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1579 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1581 stolenTask
.name
as string
1584 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1586 stolenTask
.name
as string
1590 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1592 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1595 .catch(EMPTY_FUNCTION
)
1598 private readonly workerNodeStealTask
= (
1599 workerNodeKey
: number
1600 ): Task
<Data
> | undefined => {
1601 const workerNodes
= this.workerNodes
1604 (workerNodeA
, workerNodeB
) =>
1605 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1607 const sourceWorkerNode
= workerNodes
.find(
1608 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1609 sourceWorkerNode
.info
.ready
&&
1610 sourceWorkerNodeKey
!== workerNodeKey
&&
1611 sourceWorkerNode
.usage
.tasks
.queued
> 0
1613 if (sourceWorkerNode
!= null) {
1614 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1615 if (this.shallExecuteTask(workerNodeKey
)) {
1616 this.executeTask(workerNodeKey
, task
)
1618 this.enqueueTask(workerNodeKey
, task
)
1620 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1621 this.updateTaskStolenStatisticsWorkerUsage(
1629 private readonly handleBackPressureEvent
= (
1630 eventDetail
: WorkerNodeEventDetail
1632 const { workerId
} = eventDetail
1633 const sizeOffset
= 1
1634 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1637 const sourceWorkerNode
=
1638 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1639 const workerNodes
= this.workerNodes
1642 (workerNodeA
, workerNodeB
) =>
1643 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1645 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1647 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1648 workerNode
.info
.ready
&&
1649 workerNode
.info
.id
!== workerId
&&
1650 workerNode
.usage
.tasks
.queued
<
1651 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1653 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1654 if (this.shallExecuteTask(workerNodeKey
)) {
1655 this.executeTask(workerNodeKey
, task
)
1657 this.enqueueTask(workerNodeKey
, task
)
1659 this.updateTaskStolenStatisticsWorkerUsage(
1668 * This method is the message listener registered on each worker.
1670 protected readonly workerMessageListener
= (
1671 message
: MessageValue
<Response
>
1673 this.checkMessageWorkerId(message
)
1674 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1675 if (ready
!= null && taskFunctionNames
!= null) {
1676 // Worker ready response received from worker
1677 this.handleWorkerReadyResponse(message
)
1678 } else if (taskId
!= null) {
1679 // Task execution response received from worker
1680 this.handleTaskExecutionResponse(message
)
1681 } else if (taskFunctionNames
!= null) {
1682 // Task function names message received from worker
1684 this.getWorkerNodeKeyByWorkerId(workerId
)
1685 ).taskFunctionNames
= taskFunctionNames
1689 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1690 const { workerId
, ready
, taskFunctionNames
} = message
1691 if (ready
=== false) {
1692 throw new Error(`Worker ${workerId as number} failed to initialize`)
1694 const workerInfo
= this.getWorkerInfo(
1695 this.getWorkerNodeKeyByWorkerId(workerId
)
1697 workerInfo
.ready
= ready
as boolean
1698 workerInfo
.taskFunctionNames
= taskFunctionNames
1699 if (!this.readyEventEmitted
&& this.ready
) {
1700 this.readyEventEmitted
= true
1701 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1705 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1706 const { workerId
, taskId
, workerError
, data
} = message
1707 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1708 if (promiseResponse
!= null) {
1709 const { resolve
, reject
, workerNodeKey
} = promiseResponse
1710 if (workerError
!= null) {
1711 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1712 reject(workerError
.message
)
1714 resolve(data
as Response
)
1716 this.afterTaskExecutionHook(workerNodeKey
, message
)
1717 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1718 this.promiseResponseMap
.delete(taskId
as string)
1719 if (this.opts
.enableTasksQueue
=== true) {
1720 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1722 this.tasksQueueSize(workerNodeKey
) > 0 &&
1723 workerNodeTasksUsage
.executing
<
1724 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1728 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1732 workerNodeTasksUsage
.executing
=== 0 &&
1733 this.tasksQueueSize(workerNodeKey
) === 0 &&
1734 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1736 this.workerNodes
[workerNodeKey
].emit('idleWorkerNode', {
1737 workerId
: workerId
as number,
1745 private checkAndEmitTaskExecutionEvents (): void {
1747 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1751 private checkAndEmitTaskQueuingEvents (): void {
1752 if (this.hasBackPressure()) {
1753 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1757 private checkAndEmitDynamicWorkerCreationEvents (): void {
1758 if (this.type === PoolTypes
.dynamic
) {
1760 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1766 * Gets the worker information given its worker node key.
1768 * @param workerNodeKey - The worker node key.
1769 * @returns The worker information.
1771 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1772 return this.workerNodes
[workerNodeKey
]?.info
1776 * Adds the given worker in the pool worker nodes.
1778 * @param worker - The worker.
1779 * @returns The added worker node key.
1780 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1782 private addWorkerNode (worker
: Worker
): number {
1783 const workerNode
= new WorkerNode
<Worker
, Data
>(
1785 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1787 // Flag the worker node as ready at pool startup.
1788 if (this.starting
) {
1789 workerNode
.info
.ready
= true
1791 this.workerNodes
.push(workerNode
)
1792 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1793 if (workerNodeKey
=== -1) {
1794 throw new Error('Worker added not found in worker nodes')
1796 return workerNodeKey
1800 * Removes the given worker from the pool worker nodes.
1802 * @param worker - The worker.
1804 private removeWorkerNode (worker
: Worker
): void {
1805 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1806 if (workerNodeKey
!== -1) {
1807 this.workerNodes
.splice(workerNodeKey
, 1)
1808 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1812 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1813 this.getWorkerInfo(workerNodeKey
).ready
= false
1817 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1819 this.opts
.enableTasksQueue
=== true &&
1820 this.workerNodes
[workerNodeKey
].hasBackPressure()
1824 private hasBackPressure (): boolean {
1826 this.opts
.enableTasksQueue
=== true &&
1827 this.workerNodes
.findIndex(
1828 workerNode
=> !workerNode
.hasBackPressure()
1834 * Executes the given task on the worker given its worker node key.
1836 * @param workerNodeKey - The worker node key.
1837 * @param task - The task to execute.
1839 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1840 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1841 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1842 this.checkAndEmitTaskExecutionEvents()
1845 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1846 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1847 this.checkAndEmitTaskQueuingEvents()
1848 return tasksQueueSize
1851 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1852 return this.workerNodes
[workerNodeKey
].dequeueTask()
1855 private tasksQueueSize (workerNodeKey
: number): number {
1856 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1859 protected flushTasksQueue (workerNodeKey
: number): void {
1860 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1863 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1866 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1869 private flushTasksQueues (): void {
1870 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1871 this.flushTasksQueue(workerNodeKey
)