1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
24 import { KillBehaviors
} from
'../worker/worker-options'
25 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
40 WorkerNodeEventDetail
,
45 type MeasurementStatisticsRequirements
,
47 WorkerChoiceStrategies
,
48 type WorkerChoiceStrategy
,
49 type WorkerChoiceStrategyOptions
50 } from
'./selection-strategies/selection-strategies-types'
51 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
52 import { version
} from
'./version'
53 import { WorkerNode
} from
'./worker-node'
56 checkValidTasksQueueOptions
,
57 checkValidWorkerChoiceStrategy
,
58 updateMeasurementStatistics
62 * Base class that implements some shared logic for all poolifier pools.
64 * @typeParam Worker - Type of worker which manages this pool.
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
68 export abstract class AbstractPool
<
69 Worker
extends IWorker
,
72 > implements IPool
<Worker
, Data
, Response
> {
74 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
77 public emitter
?: EventEmitterAsyncResource
80 * Dynamic pool maximum size property placeholder.
82 protected readonly max
?: number
85 * The task execution response promise map:
86 * - `key`: The message id of each submitted task.
87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
91 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
92 new Map
<string, PromiseResponseWrapper
<Response
>>()
95 * Worker choice strategy context referencing a worker choice algorithm implementation.
97 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
108 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
111 * Whether the pool is started or not.
113 private started
: boolean
115 * Whether the pool is starting or not.
117 private starting
: boolean
119 * Whether the pool is destroying or not.
121 private destroying
: boolean
123 * Whether the pool ready event has been emitted or not.
125 private readyEventEmitted
: boolean
127 * The start timestamp of the pool.
129 private readonly startTimestamp
132 * Constructs a new poolifier pool.
134 * @param numberOfWorkers - Number of workers that this pool should manage.
135 * @param filePath - Path to the worker file.
136 * @param opts - Options for the pool.
139 protected readonly numberOfWorkers
: number,
140 protected readonly filePath
: string,
141 protected readonly opts
: PoolOptions
<Worker
>
143 if (!this.isMain()) {
145 'Cannot start a pool from a worker with the same type as the pool'
148 checkFilePath(this.filePath
)
149 this.checkNumberOfWorkers(this.numberOfWorkers
)
150 this.checkPoolOptions(this.opts
)
152 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
153 this.executeTask
= this.executeTask
.bind(this)
154 this.enqueueTask
= this.enqueueTask
.bind(this)
156 if (this.opts
.enableEvents
=== true) {
157 this.initializeEventEmitter()
159 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
165 this.opts
.workerChoiceStrategy
,
166 this.opts
.workerChoiceStrategyOptions
171 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
174 this.starting
= false
175 this.destroying
= false
176 this.readyEventEmitted
= false
177 if (this.opts
.startWorkers
=== true) {
181 this.startTimestamp
= performance
.now()
184 private checkNumberOfWorkers (numberOfWorkers
: number): void {
185 if (numberOfWorkers
== null) {
187 'Cannot instantiate a pool without specifying the number of workers'
189 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
191 'Cannot instantiate a pool with a non safe integer number of workers'
193 } else if (numberOfWorkers
< 0) {
194 throw new RangeError(
195 'Cannot instantiate a pool with a negative number of workers'
197 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
202 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
203 if (isPlainObject(opts
)) {
204 this.opts
.startWorkers
= opts
.startWorkers
?? true
205 checkValidWorkerChoiceStrategy(
206 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
208 this.opts
.workerChoiceStrategy
=
209 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
213 this.opts
.workerChoiceStrategyOptions
= {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
215 ...opts
.workerChoiceStrategyOptions
217 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
218 this.opts
.enableEvents
= opts
.enableEvents
?? true
219 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
220 if (this.opts
.enableTasksQueue
) {
221 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
222 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
223 opts
.tasksQueueOptions
as TasksQueueOptions
227 throw new TypeError('Invalid pool options: must be a plain object')
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
235 workerChoiceStrategyOptions
!= null &&
236 !isPlainObject(workerChoiceStrategyOptions
)
239 'Invalid worker choice strategy options: must be a plain object'
243 workerChoiceStrategyOptions
?.retries
!= null &&
244 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
247 'Invalid worker choice strategy options: retries must be an integer'
251 workerChoiceStrategyOptions
?.retries
!= null &&
252 workerChoiceStrategyOptions
.retries
< 0
254 throw new RangeError(
255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
259 workerChoiceStrategyOptions
?.weights
!= null &&
260 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
263 'Invalid worker choice strategy options: must have a weight for each worker node'
267 workerChoiceStrategyOptions
?.measurement
!= null &&
268 !Object.values(Measurements
).includes(
269 workerChoiceStrategyOptions
.measurement
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
278 private initializeEventEmitter (): void {
279 this.emitter
= new EventEmitterAsyncResource({
280 name
: `poolifier:${this.type}-${this.worker}-pool`
285 public get
info (): PoolInfo
{
290 started
: this.started
,
292 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
293 minSize
: this.minSize
,
294 maxSize
: this.maxSize
,
295 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
296 .runTime
.aggregate
&&
297 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
299 workerNodes
: this.workerNodes
.length
,
300 idleWorkerNodes
: this.workerNodes
.reduce(
301 (accumulator
, workerNode
) =>
302 workerNode
.usage
.tasks
.executing
=== 0
307 busyWorkerNodes
: this.workerNodes
.reduce(
308 (accumulator
, workerNode
) =>
309 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
312 executedTasks
: this.workerNodes
.reduce(
313 (accumulator
, workerNode
) =>
314 accumulator
+ workerNode
.usage
.tasks
.executed
,
317 executingTasks
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 accumulator
+ workerNode
.usage
.tasks
.executing
,
322 ...(this.opts
.enableTasksQueue
=== true && {
323 queuedTasks
: this.workerNodes
.reduce(
324 (accumulator
, workerNode
) =>
325 accumulator
+ workerNode
.usage
.tasks
.queued
,
329 ...(this.opts
.enableTasksQueue
=== true && {
330 maxQueuedTasks
: this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
336 ...(this.opts
.enableTasksQueue
=== true && {
337 backPressure
: this.hasBackPressure()
339 ...(this.opts
.enableTasksQueue
=== true && {
340 stolenTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.stolen
,
346 failedTasks
: this.workerNodes
.reduce(
347 (accumulator
, workerNode
) =>
348 accumulator
+ workerNode
.usage
.tasks
.failed
,
351 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
352 .runTime
.aggregate
&& {
356 ...this.workerNodes
.map(
357 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
363 ...this.workerNodes
.map(
364 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
368 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
369 .runTime
.average
&& {
372 this.workerNodes
.reduce
<number[]>(
373 (accumulator
, workerNode
) =>
374 accumulator
.concat(workerNode
.usage
.runTime
.history
),
380 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
384 this.workerNodes
.reduce
<number[]>(
385 (accumulator
, workerNode
) =>
386 accumulator
.concat(workerNode
.usage
.runTime
.history
),
394 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
395 .waitTime
.aggregate
&& {
399 ...this.workerNodes
.map(
400 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
406 ...this.workerNodes
.map(
407 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
411 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
412 .waitTime
.average
&& {
415 this.workerNodes
.reduce
<number[]>(
416 (accumulator
, workerNode
) =>
417 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
423 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
424 .waitTime
.median
&& {
427 this.workerNodes
.reduce
<number[]>(
428 (accumulator
, workerNode
) =>
429 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
441 * The pool readiness boolean status.
443 private get
ready (): boolean {
445 this.workerNodes
.reduce(
446 (accumulator
, workerNode
) =>
447 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
456 * The approximate pool utilization.
458 * @returns The pool utilization.
460 private get
utilization (): number {
461 const poolTimeCapacity
=
462 (performance
.now() - this.startTimestamp
) * this.maxSize
463 const totalTasksRunTime
= this.workerNodes
.reduce(
464 (accumulator
, workerNode
) =>
465 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
468 const totalTasksWaitTime
= this.workerNodes
.reduce(
469 (accumulator
, workerNode
) =>
470 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
473 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
479 * If it is `'dynamic'`, it provides the `max` property.
481 protected abstract get
type (): PoolType
486 protected abstract get
worker (): WorkerType
489 * The pool minimum size.
491 protected get
minSize (): number {
492 return this.numberOfWorkers
496 * The pool maximum size.
498 protected get
maxSize (): number {
499 return this.max
?? this.numberOfWorkers
503 * Checks if the worker id sent in the received message from a worker is valid.
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
509 if (message
.workerId
== null) {
510 throw new Error('Worker message received without worker id')
511 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
513 `Worker message received from unknown worker '${message.workerId}'`
519 * Gets the given worker its worker node key.
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
524 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
525 return this.workerNodes
.findIndex(
526 workerNode
=> workerNode
.worker
=== worker
531 * Gets the worker node key given its worker id.
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
536 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
537 return this.workerNodes
.findIndex(
538 workerNode
=> workerNode
.info
.id
=== workerId
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy
: WorkerChoiceStrategy
,
545 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
547 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
548 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
549 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
550 this.opts
.workerChoiceStrategy
552 if (workerChoiceStrategyOptions
!= null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
556 workerNode
.resetUsage()
557 this.sendStatisticsMessageToWorker(workerNodeKey
)
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
566 this.opts
.workerChoiceStrategyOptions
= {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
568 ...workerChoiceStrategyOptions
570 this.workerChoiceStrategyContext
.setOptions(
571 this.opts
.workerChoiceStrategyOptions
576 public enableTasksQueue (
578 tasksQueueOptions
?: TasksQueueOptions
580 if (this.opts
.enableTasksQueue
=== true && !enable
) {
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
583 this.flushTasksQueues()
585 this.opts
.enableTasksQueue
= enable
586 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
590 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
591 if (this.opts
.enableTasksQueue
=== true) {
592 checkValidTasksQueueOptions(tasksQueueOptions
)
593 this.opts
.tasksQueueOptions
=
594 this.buildTasksQueueOptions(tasksQueueOptions
)
595 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
596 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
597 this.setTaskStealing()
599 this.unsetTaskStealing()
601 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
602 this.setTasksStealingOnBackPressure()
604 this.unsetTasksStealingOnBackPressure()
606 } else if (this.opts
.tasksQueueOptions
!= null) {
607 delete this.opts
.tasksQueueOptions
611 private buildTasksQueueOptions (
612 tasksQueueOptions
: TasksQueueOptions
613 ): TasksQueueOptions
{
616 size
: Math.pow(this.maxSize
, 2),
619 tasksStealingOnBackPressure
: true
625 private setTasksQueueSize (size
: number): void {
626 for (const workerNode
of this.workerNodes
) {
627 workerNode
.tasksQueueBackPressureSize
= size
631 private setTaskStealing (): void {
632 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
633 this.workerNodes
[workerNodeKey
].addEventListener(
635 this.handleIdleWorkerNodeEvent
as EventListener
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
642 this.workerNodes
[workerNodeKey
].removeEventListener(
644 this.handleIdleWorkerNodeEvent
as EventListener
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
651 this.workerNodes
[workerNodeKey
].addEventListener(
653 this.handleBackPressureEvent
as EventListener
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
660 this.workerNodes
[workerNodeKey
].removeEventListener(
662 this.handleBackPressureEvent
as EventListener
668 * Whether the pool is full or not.
670 * The pool filling boolean status.
672 protected get
full (): boolean {
673 return this.workerNodes
.length
>= this.maxSize
677 * Whether the pool is busy or not.
679 * The pool busyness boolean status.
681 protected abstract get
busy (): boolean
684 * Whether worker nodes are executing concurrently their tasks quota or not.
686 * @returns Worker nodes busyness boolean status.
688 protected internalBusy (): boolean {
689 if (this.opts
.enableTasksQueue
=== true) {
691 this.workerNodes
.findIndex(
693 workerNode
.info
.ready
&&
694 workerNode
.usage
.tasks
.executing
<
695 (this.opts
.tasksQueueOptions
?.concurrency
as number)
700 this.workerNodes
.findIndex(
702 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
707 private async sendTaskFunctionOperationToWorker (
708 workerNodeKey
: number,
709 message
: MessageValue
<Data
>
710 ): Promise
<boolean> {
711 return await new Promise
<boolean>((resolve
, reject
) => {
712 const taskFunctionOperationListener
= (
713 message
: MessageValue
<Response
>
715 this.checkMessageWorkerId(message
)
716 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
718 message
.taskFunctionOperationStatus
!= null &&
719 message
.workerId
=== workerId
721 if (message
.taskFunctionOperationStatus
) {
723 } else if (!message
.taskFunctionOperationStatus
) {
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
736 taskFunctionOperationListener
740 this.registerWorkerMessageListener(
742 taskFunctionOperationListener
744 this.sendToWorker(workerNodeKey
, message
)
748 private async sendTaskFunctionOperationToWorkers (
749 message
: MessageValue
<Data
>
750 ): Promise
<boolean> {
751 return await new Promise
<boolean>((resolve
, reject
) => {
752 const responsesReceived
= new Array<MessageValue
<Response
>>()
753 const taskFunctionOperationsListener
= (
754 message
: MessageValue
<Response
>
756 this.checkMessageWorkerId(message
)
757 if (message
.taskFunctionOperationStatus
!= null) {
758 responsesReceived
.push(message
)
759 if (responsesReceived
.length
=== this.workerNodes
.length
) {
761 responsesReceived
.every(
762 message
=> message
.taskFunctionOperationStatus
=== true
767 responsesReceived
.some(
768 message
=> message
.taskFunctionOperationStatus
=== false
771 const errorResponse
= responsesReceived
.find(
772 response
=> response
.taskFunctionOperationStatus
=== false
776 `Task function operation '${
777 message.taskFunctionOperation as string
778 }' failed on worker ${
779 errorResponse?.workerId as number
781 errorResponse?.workerError?.message as string
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
788 taskFunctionOperationsListener
793 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
794 this.registerWorkerMessageListener(
796 taskFunctionOperationsListener
798 this.sendToWorker(workerNodeKey
, message
)
804 public hasTaskFunction (name
: string): boolean {
805 for (const workerNode
of this.workerNodes
) {
807 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
808 workerNode
.info
.taskFunctionNames
.includes(name
)
817 public async addTaskFunction (
819 fn
: TaskFunction
<Data
, Response
>
820 ): Promise
<boolean> {
821 if (typeof name
!== 'string') {
822 throw new TypeError('name argument must be a string')
824 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
825 throw new TypeError('name argument must not be an empty string')
827 if (typeof fn
!== 'function') {
828 throw new TypeError('fn argument must be a function')
830 const opResult
= await this.sendTaskFunctionOperationToWorkers({
831 taskFunctionOperation
: 'add',
832 taskFunctionName
: name
,
833 taskFunction
: fn
.toString()
835 this.taskFunctions
.set(name
, fn
)
840 public async removeTaskFunction (name
: string): Promise
<boolean> {
841 if (!this.taskFunctions
.has(name
)) {
843 'Cannot remove a task function not handled on the pool side'
846 const opResult
= await this.sendTaskFunctionOperationToWorkers({
847 taskFunctionOperation
: 'remove',
848 taskFunctionName
: name
850 this.deleteTaskFunctionWorkerUsages(name
)
851 this.taskFunctions
.delete(name
)
856 public listTaskFunctionNames (): string[] {
857 for (const workerNode
of this.workerNodes
) {
859 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
860 workerNode
.info
.taskFunctionNames
.length
> 0
862 return workerNode
.info
.taskFunctionNames
869 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
870 return await this.sendTaskFunctionOperationToWorkers({
871 taskFunctionOperation
: 'default',
872 taskFunctionName
: name
876 private deleteTaskFunctionWorkerUsages (name
: string): void {
877 for (const workerNode
of this.workerNodes
) {
878 workerNode
.deleteTaskFunctionWorkerUsage(name
)
882 private shallExecuteTask (workerNodeKey
: number): boolean {
884 this.tasksQueueSize(workerNodeKey
) === 0 &&
885 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
886 (this.opts
.tasksQueueOptions
?.concurrency
as number)
891 public async execute (
894 transferList
?: TransferListItem
[]
895 ): Promise
<Response
> {
896 return await new Promise
<Response
>((resolve
, reject
) => {
898 reject(new Error('Cannot execute a task on not started pool'))
901 if (this.destroying
) {
902 reject(new Error('Cannot execute a task on destroying pool'))
905 if (name
!= null && typeof name
!== 'string') {
906 reject(new TypeError('name argument must be a string'))
911 typeof name
=== 'string' &&
912 name
.trim().length
=== 0
914 reject(new TypeError('name argument must not be an empty string'))
917 if (transferList
!= null && !Array.isArray(transferList
)) {
918 reject(new TypeError('transferList argument must be an array'))
921 const timestamp
= performance
.now()
922 const workerNodeKey
= this.chooseWorkerNode()
923 const task
: Task
<Data
> = {
924 name
: name
?? DEFAULT_TASK_NAME
,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data
: data
?? ({} as Data
),
931 this.promiseResponseMap
.set(task
.taskId
as string, {
937 this.opts
.enableTasksQueue
=== false ||
938 (this.opts
.enableTasksQueue
=== true &&
939 this.shallExecuteTask(workerNodeKey
))
941 this.executeTask(workerNodeKey
, task
)
943 this.enqueueTask(workerNodeKey
, task
)
949 public start (): void {
951 throw new Error('Cannot start an already started pool')
954 throw new Error('Cannot start an already starting pool')
956 if (this.destroying
) {
957 throw new Error('Cannot start a destroying pool')
961 this.workerNodes
.reduce(
962 (accumulator
, workerNode
) =>
963 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
965 ) < this.numberOfWorkers
967 this.createAndSetupWorkerNode()
969 this.starting
= false
974 public async destroy (): Promise
<void> {
976 throw new Error('Cannot destroy an already destroyed pool')
979 throw new Error('Cannot destroy an starting pool')
981 if (this.destroying
) {
982 throw new Error('Cannot destroy an already destroying pool')
984 this.destroying
= true
986 this.workerNodes
.map(async (_
, workerNodeKey
) => {
987 await this.destroyWorkerNode(workerNodeKey
)
990 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
991 this.emitter
?.emitDestroy()
992 this.readyEventEmitted
= false
993 this.destroying
= false
997 protected async sendKillMessageToWorker (
998 workerNodeKey
: number
1000 await new Promise
<void>((resolve
, reject
) => {
1001 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1002 this.checkMessageWorkerId(message
)
1003 if (message
.kill
=== 'success') {
1005 } else if (message
.kill
=== 'failure') {
1008 `Kill message handling failed on worker ${
1009 message.workerId as number
1015 // FIXME: should be registered only once
1016 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1017 this.sendToWorker(workerNodeKey
, { kill
: true })
1022 * Terminates the worker node given its worker node key.
1024 * @param workerNodeKey - The worker node key.
1026 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1029 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1030 * Can be overridden.
1034 protected setupHook (): void {
1035 /* Intentionally empty */
1039 * Should return whether the worker is the main worker or not.
1041 protected abstract isMain (): boolean
1044 * Hook executed before the worker task execution.
1045 * Can be overridden.
1047 * @param workerNodeKey - The worker node key.
1048 * @param task - The task to execute.
1050 protected beforeTaskExecutionHook (
1051 workerNodeKey
: number,
1054 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1055 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1056 ++workerUsage
.tasks
.executing
1057 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1060 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1061 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1065 const taskFunctionWorkerUsage
= this.workerNodes
[
1067 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1068 ++taskFunctionWorkerUsage
.tasks
.executing
1069 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1074 * Hook executed after the worker task execution.
1075 * Can be overridden.
1077 * @param workerNodeKey - The worker node key.
1078 * @param message - The received message.
1080 protected afterTaskExecutionHook (
1081 workerNodeKey
: number,
1082 message
: MessageValue
<Response
>
1084 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1085 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1086 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1087 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1088 this.updateEluWorkerUsage(workerUsage
, message
)
1091 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1092 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1093 message
.taskPerformance
?.name
as string
1096 const taskFunctionWorkerUsage
= this.workerNodes
[
1098 ].getTaskFunctionWorkerUsage(
1099 message
.taskPerformance
?.name
as string
1101 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1102 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1103 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1108 * Whether the worker node shall update its task function worker usage or not.
1110 * @param workerNodeKey - The worker node key.
1111 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1113 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1114 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1116 workerInfo
!= null &&
1117 Array.isArray(workerInfo
.taskFunctionNames
) &&
1118 workerInfo
.taskFunctionNames
.length
> 2
1122 private updateTaskStatisticsWorkerUsage (
1123 workerUsage
: WorkerUsage
,
1124 message
: MessageValue
<Response
>
1126 const workerTaskStatistics
= workerUsage
.tasks
1128 workerTaskStatistics
.executing
!= null &&
1129 workerTaskStatistics
.executing
> 0
1131 --workerTaskStatistics
.executing
1133 if (message
.workerError
== null) {
1134 ++workerTaskStatistics
.executed
1136 ++workerTaskStatistics
.failed
1140 private updateRunTimeWorkerUsage (
1141 workerUsage
: WorkerUsage
,
1142 message
: MessageValue
<Response
>
1144 if (message
.workerError
!= null) {
1147 updateMeasurementStatistics(
1148 workerUsage
.runTime
,
1149 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1150 message
.taskPerformance
?.runTime
?? 0
1154 private updateWaitTimeWorkerUsage (
1155 workerUsage
: WorkerUsage
,
1158 const timestamp
= performance
.now()
1159 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1160 updateMeasurementStatistics(
1161 workerUsage
.waitTime
,
1162 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1167 private updateEluWorkerUsage (
1168 workerUsage
: WorkerUsage
,
1169 message
: MessageValue
<Response
>
1171 if (message
.workerError
!= null) {
1174 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1175 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1176 updateMeasurementStatistics(
1177 workerUsage
.elu
.active
,
1178 eluTaskStatisticsRequirements
,
1179 message
.taskPerformance
?.elu
?.active
?? 0
1181 updateMeasurementStatistics(
1182 workerUsage
.elu
.idle
,
1183 eluTaskStatisticsRequirements
,
1184 message
.taskPerformance
?.elu
?.idle
?? 0
1186 if (eluTaskStatisticsRequirements
.aggregate
) {
1187 if (message
.taskPerformance
?.elu
!= null) {
1188 if (workerUsage
.elu
.utilization
!= null) {
1189 workerUsage
.elu
.utilization
=
1190 (workerUsage
.elu
.utilization
+
1191 message
.taskPerformance
.elu
.utilization
) /
1194 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1201 * Chooses a worker node for the next task.
1203 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1205 * @returns The chosen worker node key
1207 private chooseWorkerNode (): number {
1208 if (this.shallCreateDynamicWorker()) {
1209 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1211 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1213 return workerNodeKey
1216 return this.workerChoiceStrategyContext
.execute()
1220 * Conditions for dynamic worker creation.
1222 * @returns Whether to create a dynamic worker or not.
1224 private shallCreateDynamicWorker (): boolean {
1225 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1229 * Sends a message to worker given its worker node key.
1231 * @param workerNodeKey - The worker node key.
1232 * @param message - The message.
1233 * @param transferList - The optional array of transferable objects.
1235 protected abstract sendToWorker (
1236 workerNodeKey
: number,
1237 message
: MessageValue
<Data
>,
1238 transferList
?: TransferListItem
[]
1242 * Creates a new worker.
1244 * @returns Newly created worker.
1246 protected abstract createWorker (): Worker
1249 * Creates a new, completely set up worker node.
1251 * @returns New, completely set up worker node key.
1253 protected createAndSetupWorkerNode (): number {
1254 const worker
= this.createWorker()
1256 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1257 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1258 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1259 worker
.on('error', error
=> {
1260 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1261 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1262 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1263 this.emitter
?.emit(PoolEvents
.error
, error
)
1264 this.workerNodes
[workerNodeKey
].closeChannel()
1269 this.opts
.restartWorkerOnError
=== true
1271 if (workerInfo
.dynamic
) {
1272 this.createAndSetupDynamicWorkerNode()
1274 this.createAndSetupWorkerNode()
1277 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1278 this.redistributeQueuedTasks(workerNodeKey
)
1281 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1282 worker
.once('exit', () => {
1283 this.removeWorkerNode(worker
)
1286 const workerNodeKey
= this.addWorkerNode(worker
)
1288 this.afterWorkerNodeSetup(workerNodeKey
)
1290 return workerNodeKey
1294 * Creates a new, completely set up dynamic worker node.
1296 * @returns New, completely set up dynamic worker node key.
1298 protected createAndSetupDynamicWorkerNode (): number {
1299 const workerNodeKey
= this.createAndSetupWorkerNode()
1300 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1301 this.checkMessageWorkerId(message
)
1302 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1305 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1306 // Kill message received from worker
1308 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1309 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1310 ((this.opts
.enableTasksQueue
=== false &&
1311 workerUsage
.tasks
.executing
=== 0) ||
1312 (this.opts
.enableTasksQueue
=== true &&
1313 workerUsage
.tasks
.executing
=== 0 &&
1314 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1316 // Flag the worker node as not ready immediately
1317 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1318 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1319 this.emitter
?.emit(PoolEvents
.error
, error
)
1323 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1324 this.sendToWorker(workerNodeKey
, {
1327 if (this.taskFunctions
.size
> 0) {
1328 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1329 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1330 taskFunctionOperation
: 'add',
1332 taskFunction
: taskFunction
.toString()
1334 this.emitter
?.emit(PoolEvents
.error
, error
)
1338 workerInfo
.dynamic
= true
1340 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1341 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1343 workerInfo
.ready
= true
1345 this.checkAndEmitDynamicWorkerCreationEvents()
1346 return workerNodeKey
1350 * Registers a listener callback on the worker given its worker node key.
1352 * @param workerNodeKey - The worker node key.
1353 * @param listener - The message listener callback.
1355 protected abstract registerWorkerMessageListener
<
1356 Message
extends Data
| Response
1358 workerNodeKey
: number,
1359 listener
: (message
: MessageValue
<Message
>) => void
1363 * Registers once a listener callback on the worker given its worker node key.
1365 * @param workerNodeKey - The worker node key.
1366 * @param listener - The message listener callback.
1368 protected abstract registerOnceWorkerMessageListener
<
1369 Message
extends Data
| Response
1371 workerNodeKey
: number,
1372 listener
: (message
: MessageValue
<Message
>) => void
1376 * Deregisters a listener callback on the worker given its worker node key.
1378 * @param workerNodeKey - The worker node key.
1379 * @param listener - The message listener callback.
1381 protected abstract deregisterWorkerMessageListener
<
1382 Message
extends Data
| Response
1384 workerNodeKey
: number,
1385 listener
: (message
: MessageValue
<Message
>) => void
1389 * Method hooked up after a worker node has been newly created.
1390 * Can be overridden.
1392 * @param workerNodeKey - The newly created worker node key.
1394 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1395 // Listen to worker messages.
1396 this.registerWorkerMessageListener(
1398 this.workerMessageListener
.bind(this)
1400 // Send the startup message to worker.
1401 this.sendStartupMessageToWorker(workerNodeKey
)
1402 // Send the statistics message to worker.
1403 this.sendStatisticsMessageToWorker(workerNodeKey
)
1404 if (this.opts
.enableTasksQueue
=== true) {
1405 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1406 this.workerNodes
[workerNodeKey
].addEventListener(
1408 this.handleIdleWorkerNodeEvent
as EventListener
1411 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1412 this.workerNodes
[workerNodeKey
].addEventListener(
1414 this.handleBackPressureEvent
as EventListener
1421 * Sends the startup message to worker given its worker node key.
1423 * @param workerNodeKey - The worker node key.
1425 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1428 * Sends the statistics message to worker given its worker node key.
1430 * @param workerNodeKey - The worker node key.
1432 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1433 this.sendToWorker(workerNodeKey
, {
1436 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1438 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1444 private redistributeQueuedTasks (workerNodeKey
: number): void {
1445 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1446 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1447 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1448 return workerNode
.info
.ready
&&
1449 workerNode
.usage
.tasks
.queued
<
1450 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1456 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1457 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1458 this.executeTask(destinationWorkerNodeKey
, task
)
1460 this.enqueueTask(destinationWorkerNodeKey
, task
)
1465 private updateTaskStolenStatisticsWorkerUsage (
1466 workerNodeKey
: number,
1469 const workerNode
= this.workerNodes
[workerNodeKey
]
1470 if (workerNode
?.usage
!= null) {
1471 ++workerNode
.usage
.tasks
.stolen
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1475 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1477 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1480 ++taskFunctionWorkerUsage
.tasks
.stolen
1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1485 workerNodeKey
: number
1487 const workerNode
= this.workerNodes
[workerNodeKey
]
1488 if (workerNode
?.usage
!= null) {
1489 ++workerNode
.usage
.tasks
.sequentiallyStolen
1493 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1494 workerNodeKey
: number,
1497 const workerNode
= this.workerNodes
[workerNodeKey
]
1499 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1500 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1502 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1505 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1509 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1510 workerNodeKey
: number
1512 const workerNode
= this.workerNodes
[workerNodeKey
]
1513 if (workerNode
?.usage
!= null) {
1514 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1518 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1519 workerNodeKey
: number,
1522 const workerNode
= this.workerNodes
[workerNodeKey
]
1524 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1525 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1527 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1530 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1534 private readonly handleIdleWorkerNodeEvent
= (
1535 event
: CustomEvent
<WorkerNodeEventDetail
>,
1536 previousStolenTask
?: Task
<Data
>
1538 const { workerNodeKey
} = event
.detail
1539 if (workerNodeKey
== null) {
1541 'WorkerNode event detail workerNodeKey attribute must be defined'
1544 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1546 previousStolenTask
!= null &&
1547 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1548 (workerNodeTasksUsage
.executing
> 0 ||
1549 this.tasksQueueSize(workerNodeKey
) > 0)
1551 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1552 .taskFunctionNames
as string[]) {
1553 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1558 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1561 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1563 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1566 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1568 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1569 ?.tasks
as TaskStatistics
1571 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1572 (previousStolenTask
!= null &&
1573 previousStolenTask
.name
=== stolenTask
.name
&&
1574 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1576 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1578 stolenTask
.name
as string
1581 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1583 stolenTask
.name
as string
1587 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1589 this.handleIdleWorkerNodeEvent(event
, stolenTask
)
1592 .catch(EMPTY_FUNCTION
)
1595 private readonly workerNodeStealTask
= (
1596 workerNodeKey
: number
1597 ): Task
<Data
> | undefined => {
1598 const workerNodes
= this.workerNodes
1601 (workerNodeA
, workerNodeB
) =>
1602 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1604 const sourceWorkerNode
= workerNodes
.find(
1605 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1606 sourceWorkerNode
.info
.ready
&&
1607 sourceWorkerNodeKey
!== workerNodeKey
&&
1608 sourceWorkerNode
.usage
.tasks
.queued
> 0
1610 if (sourceWorkerNode
!= null) {
1611 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1612 if (this.shallExecuteTask(workerNodeKey
)) {
1613 this.executeTask(workerNodeKey
, task
)
1615 this.enqueueTask(workerNodeKey
, task
)
1617 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1618 this.updateTaskStolenStatisticsWorkerUsage(
1626 private readonly handleBackPressureEvent
= (
1627 event
: CustomEvent
<WorkerNodeEventDetail
>
1629 const { workerId
} = event
.detail
1630 const sizeOffset
= 1
1631 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1634 const sourceWorkerNode
=
1635 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1636 const workerNodes
= this.workerNodes
1639 (workerNodeA
, workerNodeB
) =>
1640 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1642 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1644 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1645 workerNode
.info
.ready
&&
1646 workerNode
.info
.id
!== workerId
&&
1647 workerNode
.usage
.tasks
.queued
<
1648 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1650 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1651 if (this.shallExecuteTask(workerNodeKey
)) {
1652 this.executeTask(workerNodeKey
, task
)
1654 this.enqueueTask(workerNodeKey
, task
)
1656 this.updateTaskStolenStatisticsWorkerUsage(
1665 * This method is the message listener registered on each worker.
1667 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1668 this.checkMessageWorkerId(message
)
1669 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1670 if (ready
!= null && taskFunctionNames
!= null) {
1671 // Worker ready response received from worker
1672 this.handleWorkerReadyResponse(message
)
1673 } else if (taskId
!= null) {
1674 // Task execution response received from worker
1675 this.handleTaskExecutionResponse(message
)
1676 } else if (taskFunctionNames
!= null) {
1677 // Task function names message received from worker
1679 this.getWorkerNodeKeyByWorkerId(workerId
)
1680 ).taskFunctionNames
= taskFunctionNames
1684 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1685 const { workerId
, ready
, taskFunctionNames
} = message
1686 if (ready
=== false) {
1687 throw new Error(`Worker ${workerId as number} failed to initialize`)
1689 const workerInfo
= this.getWorkerInfo(
1690 this.getWorkerNodeKeyByWorkerId(workerId
)
1692 workerInfo
.ready
= ready
as boolean
1693 workerInfo
.taskFunctionNames
= taskFunctionNames
1694 if (!this.readyEventEmitted
&& this.ready
) {
1695 this.readyEventEmitted
= true
1696 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1700 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1701 const { workerId
, taskId
, workerError
, data
} = message
1702 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1703 if (promiseResponse
!= null) {
1704 const { resolve
, reject
, workerNodeKey
} = promiseResponse
1705 if (workerError
!= null) {
1706 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1707 reject(workerError
.message
)
1709 resolve(data
as Response
)
1711 this.afterTaskExecutionHook(workerNodeKey
, message
)
1712 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1713 this.promiseResponseMap
.delete(taskId
as string)
1714 if (this.opts
.enableTasksQueue
=== true) {
1715 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1717 this.tasksQueueSize(workerNodeKey
) > 0 &&
1718 workerNodeTasksUsage
.executing
<
1719 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1723 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1727 workerNodeTasksUsage
.executing
=== 0 &&
1728 this.tasksQueueSize(workerNodeKey
) === 0 &&
1729 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1731 this.workerNodes
[workerNodeKey
].dispatchEvent(
1732 new CustomEvent
<WorkerNodeEventDetail
>('idleWorkerNode', {
1733 detail
: { workerId
: workerId
as number, workerNodeKey
}
1741 private checkAndEmitTaskExecutionEvents (): void {
1743 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1747 private checkAndEmitTaskQueuingEvents (): void {
1748 if (this.hasBackPressure()) {
1749 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1753 private checkAndEmitDynamicWorkerCreationEvents (): void {
1754 if (this.type === PoolTypes
.dynamic
) {
1756 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1762 * Gets the worker information given its worker node key.
1764 * @param workerNodeKey - The worker node key.
1765 * @returns The worker information.
1767 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1768 return this.workerNodes
[workerNodeKey
]?.info
1772 * Adds the given worker in the pool worker nodes.
1774 * @param worker - The worker.
1775 * @returns The added worker node key.
1776 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1778 private addWorkerNode (worker
: Worker
): number {
1779 const workerNode
= new WorkerNode
<Worker
, Data
>(
1781 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1783 // Flag the worker node as ready at pool startup.
1784 if (this.starting
) {
1785 workerNode
.info
.ready
= true
1787 this.workerNodes
.push(workerNode
)
1788 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1789 if (workerNodeKey
=== -1) {
1790 throw new Error('Worker added not found in worker nodes')
1792 return workerNodeKey
1796 * Removes the given worker from the pool worker nodes.
1798 * @param worker - The worker.
1800 private removeWorkerNode (worker
: Worker
): void {
1801 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1802 if (workerNodeKey
!== -1) {
1803 this.workerNodes
.splice(workerNodeKey
, 1)
1804 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1808 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1809 this.getWorkerInfo(workerNodeKey
).ready
= false
1813 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1815 this.opts
.enableTasksQueue
=== true &&
1816 this.workerNodes
[workerNodeKey
].hasBackPressure()
1820 private hasBackPressure (): boolean {
1822 this.opts
.enableTasksQueue
=== true &&
1823 this.workerNodes
.findIndex(
1824 workerNode
=> !workerNode
.hasBackPressure()
1830 * Executes the given task on the worker given its worker node key.
1832 * @param workerNodeKey - The worker node key.
1833 * @param task - The task to execute.
1835 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1836 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1837 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1838 this.checkAndEmitTaskExecutionEvents()
1841 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1842 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1843 this.checkAndEmitTaskQueuingEvents()
1844 return tasksQueueSize
1847 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1848 return this.workerNodes
[workerNodeKey
].dequeueTask()
1851 private tasksQueueSize (workerNodeKey
: number): number {
1852 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1855 protected flushTasksQueue (workerNodeKey
: number): void {
1856 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1859 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1862 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1867 this.flushTasksQueue(workerNodeKey
)