1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types.js'
24 import { KillBehaviors
} from
'../worker/worker-options.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
33 type TasksQueueOptions
40 WorkerNodeEventDetail
,
46 WorkerChoiceStrategies
,
47 type WorkerChoiceStrategy
,
48 type WorkerChoiceStrategyOptions
49 } from
'./selection-strategies/selection-strategies-types.js'
50 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
51 import { version
} from
'./version.js'
52 import { WorkerNode
} from
'./worker-node.js'
55 checkValidTasksQueueOptions
,
56 checkValidWorkerChoiceStrategy
,
57 getDefaultTasksQueueOptions
,
59 updateRunTimeWorkerUsage
,
60 updateTaskStatisticsWorkerUsage
,
61 updateWaitTimeWorkerUsage
,
66 * Base class that implements some shared logic for all poolifier pools.
68 * @typeParam Worker - Type of worker which manages this pool.
69 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
70 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
72 export abstract class AbstractPool
<
73 Worker
extends IWorker
,
76 > implements IPool
<Worker
, Data
, Response
> {
78 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
81 public emitter
?: EventEmitterAsyncResource
84 * The task execution response promise map:
85 * - `key`: The message id of each submitted task.
86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
91 new Map
<string, PromiseResponseWrapper
<Response
>>()
94 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
107 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
110 * Whether the pool is started or not.
112 private started
: boolean
114 * Whether the pool is starting or not.
116 private starting
: boolean
118 * Whether the pool is destroying or not.
120 private destroying
: boolean
122 * Whether the pool ready event has been emitted or not.
124 private readyEventEmitted
: boolean
126 * The start timestamp of the pool.
128 private readonly startTimestamp
131 * Constructs a new poolifier pool.
133 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
134 * @param filePath - Path to the worker file.
135 * @param opts - Options for the pool.
136 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
139 protected readonly minimumNumberOfWorkers
: number,
140 protected readonly filePath
: string,
141 protected readonly opts
: PoolOptions
<Worker
>,
142 protected readonly maximumNumberOfWorkers
?: number
144 if (!this.isMain()) {
146 'Cannot start a pool from a worker with the same type as the pool'
150 checkFilePath(this.filePath
)
151 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
152 this.checkPoolOptions(this.opts
)
154 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
155 this.executeTask
= this.executeTask
.bind(this)
156 this.enqueueTask
= this.enqueueTask
.bind(this)
158 if (this.opts
.enableEvents
=== true) {
159 this.initializeEventEmitter()
161 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
167 this.opts
.workerChoiceStrategy
,
168 this.opts
.workerChoiceStrategyOptions
173 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
176 this.starting
= false
177 this.destroying
= false
178 this.readyEventEmitted
= false
179 if (this.opts
.startWorkers
=== true) {
183 this.startTimestamp
= performance
.now()
186 private checkPoolType (): void {
187 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
189 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
194 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers
: number): void {
195 if (minimumNumberOfWorkers
== null) {
197 'Cannot instantiate a pool without specifying the number of workers'
199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
201 'Cannot instantiate a pool with a non safe integer number of workers'
203 } else if (minimumNumberOfWorkers
< 0) {
204 throw new RangeError(
205 'Cannot instantiate a pool with a negative number of workers'
207 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
212 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
213 if (isPlainObject(opts
)) {
214 this.opts
.startWorkers
= opts
.startWorkers
?? true
215 checkValidWorkerChoiceStrategy(
216 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
218 this.opts
.workerChoiceStrategy
=
219 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
220 this.checkValidWorkerChoiceStrategyOptions(
221 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
223 if (opts
.workerChoiceStrategyOptions
!= null) {
224 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
226 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
227 this.opts
.enableEvents
= opts
.enableEvents
?? true
228 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
229 if (this.opts
.enableTasksQueue
) {
230 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
231 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
232 opts
.tasksQueueOptions
as TasksQueueOptions
236 throw new TypeError('Invalid pool options: must be a plain object')
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
244 workerChoiceStrategyOptions
!= null &&
245 !isPlainObject(workerChoiceStrategyOptions
)
248 'Invalid worker choice strategy options: must be a plain object'
252 workerChoiceStrategyOptions
?.weights
!= null &&
253 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
254 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
257 'Invalid worker choice strategy options: must have a weight for each worker node'
261 workerChoiceStrategyOptions
?.measurement
!= null &&
262 !Object.values(Measurements
).includes(
263 workerChoiceStrategyOptions
.measurement
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 private initializeEventEmitter (): void {
273 this.emitter
= new EventEmitterAsyncResource({
274 name
: `poolifier:${this.type}-${this.worker}-pool`
279 public get
info (): PoolInfo
{
284 started
: this.started
,
286 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
287 minSize
: this.minimumNumberOfWorkers
,
288 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
289 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
290 .runTime
.aggregate
&&
291 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
292 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
293 workerNodes
: this.workerNodes
.length
,
294 idleWorkerNodes
: this.workerNodes
.reduce(
295 (accumulator
, workerNode
) =>
296 workerNode
.usage
.tasks
.executing
=== 0
301 ...(this.opts
.enableTasksQueue
=== true && {
302 stealingWorkerNodes
: this.workerNodes
.reduce(
303 (accumulator
, workerNode
) =>
304 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
308 busyWorkerNodes
: this.workerNodes
.reduce(
309 (accumulator
, _workerNode
, workerNodeKey
) =>
310 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
313 executedTasks
: this.workerNodes
.reduce(
314 (accumulator
, workerNode
) =>
315 accumulator
+ workerNode
.usage
.tasks
.executed
,
318 executingTasks
: this.workerNodes
.reduce(
319 (accumulator
, workerNode
) =>
320 accumulator
+ workerNode
.usage
.tasks
.executing
,
323 ...(this.opts
.enableTasksQueue
=== true && {
324 queuedTasks
: this.workerNodes
.reduce(
325 (accumulator
, workerNode
) =>
326 accumulator
+ workerNode
.usage
.tasks
.queued
,
330 ...(this.opts
.enableTasksQueue
=== true && {
331 maxQueuedTasks
: this.workerNodes
.reduce(
332 (accumulator
, workerNode
) =>
333 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
337 ...(this.opts
.enableTasksQueue
=== true && {
338 backPressure
: this.hasBackPressure()
340 ...(this.opts
.enableTasksQueue
=== true && {
341 stolenTasks
: this.workerNodes
.reduce(
342 (accumulator
, workerNode
) =>
343 accumulator
+ workerNode
.usage
.tasks
.stolen
,
347 failedTasks
: this.workerNodes
.reduce(
348 (accumulator
, workerNode
) =>
349 accumulator
+ workerNode
.usage
.tasks
.failed
,
352 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
353 .runTime
.aggregate
&& {
357 ...this.workerNodes
.map(
358 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
364 ...this.workerNodes
.map(
365 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
369 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
370 .runTime
.average
&& {
373 this.workerNodes
.reduce
<number[]>(
374 (accumulator
, workerNode
) =>
375 accumulator
.concat(workerNode
.usage
.runTime
.history
),
381 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
385 this.workerNodes
.reduce
<number[]>(
386 (accumulator
, workerNode
) =>
387 accumulator
.concat(workerNode
.usage
.runTime
.history
),
395 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
396 .waitTime
.aggregate
&& {
400 ...this.workerNodes
.map(
401 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
407 ...this.workerNodes
.map(
408 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
412 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
413 .waitTime
.average
&& {
416 this.workerNodes
.reduce
<number[]>(
417 (accumulator
, workerNode
) =>
418 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
424 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
425 .waitTime
.median
&& {
428 this.workerNodes
.reduce
<number[]>(
429 (accumulator
, workerNode
) =>
430 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
442 * The pool readiness boolean status.
444 private get
ready (): boolean {
446 this.workerNodes
.reduce(
447 (accumulator
, workerNode
) =>
448 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
452 ) >= this.minimumNumberOfWorkers
457 * The approximate pool utilization.
459 * @returns The pool utilization.
461 private get
utilization (): number {
462 const poolTimeCapacity
=
463 (performance
.now() - this.startTimestamp
) *
464 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
465 const totalTasksRunTime
= this.workerNodes
.reduce(
466 (accumulator
, workerNode
) =>
467 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
470 const totalTasksWaitTime
= this.workerNodes
.reduce(
471 (accumulator
, workerNode
) =>
472 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
475 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
481 * If it is `'dynamic'`, it provides the `max` property.
483 protected abstract get
type (): PoolType
488 protected abstract get
worker (): WorkerType
491 * Checks if the worker id sent in the received message from a worker is valid.
493 * @param message - The received message.
494 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
496 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
497 if (message
.workerId
== null) {
498 throw new Error('Worker message received without worker id')
499 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
501 `Worker message received from unknown worker '${message.workerId}'`
507 * Gets the worker node key given its worker id.
509 * @param workerId - The worker id.
510 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
512 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
513 return this.workerNodes
.findIndex(
514 workerNode
=> workerNode
.info
.id
=== workerId
519 public setWorkerChoiceStrategy (
520 workerChoiceStrategy
: WorkerChoiceStrategy
,
521 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
523 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
524 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
525 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
526 this.opts
.workerChoiceStrategy
528 if (workerChoiceStrategyOptions
!= null) {
529 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
531 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
532 workerNode
.resetUsage()
533 this.sendStatisticsMessageToWorker(workerNodeKey
)
538 public setWorkerChoiceStrategyOptions (
539 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
541 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
542 if (workerChoiceStrategyOptions
!= null) {
543 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
545 this.workerChoiceStrategyContext
.setOptions(
547 this.opts
.workerChoiceStrategyOptions
552 public enableTasksQueue (
554 tasksQueueOptions
?: TasksQueueOptions
556 if (this.opts
.enableTasksQueue
=== true && !enable
) {
557 this.unsetTaskStealing()
558 this.unsetTasksStealingOnBackPressure()
559 this.flushTasksQueues()
561 this.opts
.enableTasksQueue
= enable
562 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
566 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
567 if (this.opts
.enableTasksQueue
=== true) {
568 checkValidTasksQueueOptions(tasksQueueOptions
)
569 this.opts
.tasksQueueOptions
=
570 this.buildTasksQueueOptions(tasksQueueOptions
)
571 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
572 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
573 this.unsetTaskStealing()
574 this.setTaskStealing()
576 this.unsetTaskStealing()
578 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
579 this.unsetTasksStealingOnBackPressure()
580 this.setTasksStealingOnBackPressure()
582 this.unsetTasksStealingOnBackPressure()
584 } else if (this.opts
.tasksQueueOptions
!= null) {
585 delete this.opts
.tasksQueueOptions
589 private buildTasksQueueOptions (
590 tasksQueueOptions
: TasksQueueOptions
591 ): TasksQueueOptions
{
593 ...getDefaultTasksQueueOptions(
594 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
600 private setTasksQueueSize (size
: number): void {
601 for (const workerNode
of this.workerNodes
) {
602 workerNode
.tasksQueueBackPressureSize
= size
606 private setTaskStealing (): void {
607 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
608 this.workerNodes
[workerNodeKey
].on(
610 this.handleIdleWorkerNodeEvent
615 private unsetTaskStealing (): void {
616 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
617 this.workerNodes
[workerNodeKey
].off(
619 this.handleIdleWorkerNodeEvent
624 private setTasksStealingOnBackPressure (): void {
625 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
626 this.workerNodes
[workerNodeKey
].on(
628 this.handleBackPressureEvent
633 private unsetTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
635 this.workerNodes
[workerNodeKey
].off(
637 this.handleBackPressureEvent
643 * Whether the pool is full or not.
645 * The pool filling boolean status.
647 protected get
full (): boolean {
649 this.workerNodes
.length
>=
650 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
655 * Whether the pool is busy or not.
657 * The pool busyness boolean status.
659 protected abstract get
busy (): boolean
662 * Whether worker nodes are executing concurrently their tasks quota or not.
664 * @returns Worker nodes busyness boolean status.
666 protected internalBusy (): boolean {
667 if (this.opts
.enableTasksQueue
=== true) {
669 this.workerNodes
.findIndex(
671 workerNode
.info
.ready
&&
672 workerNode
.usage
.tasks
.executing
<
673 (this.opts
.tasksQueueOptions
?.concurrency
as number)
678 this.workerNodes
.findIndex(
680 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
685 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
686 if (this.opts
.enableTasksQueue
=== true) {
688 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
689 (this.opts
.tasksQueueOptions
?.concurrency
as number)
692 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
695 private async sendTaskFunctionOperationToWorker (
696 workerNodeKey
: number,
697 message
: MessageValue
<Data
>
698 ): Promise
<boolean> {
699 return await new Promise
<boolean>((resolve
, reject
) => {
700 const taskFunctionOperationListener
= (
701 message
: MessageValue
<Response
>
703 this.checkMessageWorkerId(message
)
704 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
706 message
.taskFunctionOperationStatus
!= null &&
707 message
.workerId
=== workerId
709 if (message
.taskFunctionOperationStatus
) {
711 } else if (!message
.taskFunctionOperationStatus
) {
714 `Task function operation '${
715 message.taskFunctionOperation as string
716 }' failed on worker ${message.workerId} with error: '${
717 message.workerError?.message as string
722 this.deregisterWorkerMessageListener(
723 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
724 taskFunctionOperationListener
728 this.registerWorkerMessageListener(
730 taskFunctionOperationListener
732 this.sendToWorker(workerNodeKey
, message
)
736 private async sendTaskFunctionOperationToWorkers (
737 message
: MessageValue
<Data
>
738 ): Promise
<boolean> {
739 return await new Promise
<boolean>((resolve
, reject
) => {
740 const responsesReceived
= new Array<MessageValue
<Response
>>()
741 const taskFunctionOperationsListener
= (
742 message
: MessageValue
<Response
>
744 this.checkMessageWorkerId(message
)
745 if (message
.taskFunctionOperationStatus
!= null) {
746 responsesReceived
.push(message
)
747 if (responsesReceived
.length
=== this.workerNodes
.length
) {
749 responsesReceived
.every(
750 message
=> message
.taskFunctionOperationStatus
=== true
755 responsesReceived
.some(
756 message
=> message
.taskFunctionOperationStatus
=== false
759 const errorResponse
= responsesReceived
.find(
760 response
=> response
.taskFunctionOperationStatus
=== false
764 `Task function operation '${
765 message.taskFunctionOperation as string
766 }' failed on worker ${
767 errorResponse?.workerId as number
769 errorResponse?.workerError?.message as string
774 this.deregisterWorkerMessageListener(
775 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
776 taskFunctionOperationsListener
781 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
782 this.registerWorkerMessageListener(
784 taskFunctionOperationsListener
786 this.sendToWorker(workerNodeKey
, message
)
792 public hasTaskFunction (name
: string): boolean {
793 for (const workerNode
of this.workerNodes
) {
795 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
796 workerNode
.info
.taskFunctionNames
.includes(name
)
805 public async addTaskFunction (
807 fn
: TaskFunction
<Data
, Response
>
808 ): Promise
<boolean> {
809 if (typeof name
!== 'string') {
810 throw new TypeError('name argument must be a string')
812 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
813 throw new TypeError('name argument must not be an empty string')
815 if (typeof fn
!== 'function') {
816 throw new TypeError('fn argument must be a function')
818 const opResult
= await this.sendTaskFunctionOperationToWorkers({
819 taskFunctionOperation
: 'add',
820 taskFunctionName
: name
,
821 taskFunction
: fn
.toString()
823 this.taskFunctions
.set(name
, fn
)
828 public async removeTaskFunction (name
: string): Promise
<boolean> {
829 if (!this.taskFunctions
.has(name
)) {
831 'Cannot remove a task function not handled on the pool side'
834 const opResult
= await this.sendTaskFunctionOperationToWorkers({
835 taskFunctionOperation
: 'remove',
836 taskFunctionName
: name
838 this.deleteTaskFunctionWorkerUsages(name
)
839 this.taskFunctions
.delete(name
)
844 public listTaskFunctionNames (): string[] {
845 for (const workerNode
of this.workerNodes
) {
847 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
848 workerNode
.info
.taskFunctionNames
.length
> 0
850 return workerNode
.info
.taskFunctionNames
857 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
858 return await this.sendTaskFunctionOperationToWorkers({
859 taskFunctionOperation
: 'default',
860 taskFunctionName
: name
864 private deleteTaskFunctionWorkerUsages (name
: string): void {
865 for (const workerNode
of this.workerNodes
) {
866 workerNode
.deleteTaskFunctionWorkerUsage(name
)
870 private shallExecuteTask (workerNodeKey
: number): boolean {
872 this.tasksQueueSize(workerNodeKey
) === 0 &&
873 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
874 (this.opts
.tasksQueueOptions
?.concurrency
as number)
879 public async execute (
882 transferList
?: TransferListItem
[]
883 ): Promise
<Response
> {
884 return await new Promise
<Response
>((resolve
, reject
) => {
886 reject(new Error('Cannot execute a task on not started pool'))
889 if (this.destroying
) {
890 reject(new Error('Cannot execute a task on destroying pool'))
893 if (name
!= null && typeof name
!== 'string') {
894 reject(new TypeError('name argument must be a string'))
899 typeof name
=== 'string' &&
900 name
.trim().length
=== 0
902 reject(new TypeError('name argument must not be an empty string'))
905 if (transferList
!= null && !Array.isArray(transferList
)) {
906 reject(new TypeError('transferList argument must be an array'))
909 const timestamp
= performance
.now()
910 const workerNodeKey
= this.chooseWorkerNode()
911 const task
: Task
<Data
> = {
912 name
: name
?? DEFAULT_TASK_NAME
,
913 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
914 data
: data
?? ({} as Data
),
919 this.promiseResponseMap
.set(task
.taskId
as string, {
923 ...(this.emitter
!= null && {
924 asyncResource
: new AsyncResource('poolifier:task', {
925 triggerAsyncId
: this.emitter
.asyncId
,
926 requireManualDestroy
: true
931 this.opts
.enableTasksQueue
=== false ||
932 (this.opts
.enableTasksQueue
=== true &&
933 this.shallExecuteTask(workerNodeKey
))
935 this.executeTask(workerNodeKey
, task
)
937 this.enqueueTask(workerNodeKey
, task
)
943 public start (): void {
945 throw new Error('Cannot start an already started pool')
948 throw new Error('Cannot start an already starting pool')
950 if (this.destroying
) {
951 throw new Error('Cannot start a destroying pool')
955 this.workerNodes
.reduce(
956 (accumulator
, workerNode
) =>
957 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
959 ) < this.minimumNumberOfWorkers
961 this.createAndSetupWorkerNode()
963 this.starting
= false
968 public async destroy (): Promise
<void> {
970 throw new Error('Cannot destroy an already destroyed pool')
973 throw new Error('Cannot destroy an starting pool')
975 if (this.destroying
) {
976 throw new Error('Cannot destroy an already destroying pool')
978 this.destroying
= true
980 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
981 await this.destroyWorkerNode(workerNodeKey
)
984 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
985 this.emitter
?.emitDestroy()
986 this.emitter
?.removeAllListeners()
987 this.readyEventEmitted
= false
988 this.destroying
= false
992 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
993 await new Promise
<void>((resolve
, reject
) => {
994 if (this.workerNodes
?.[workerNodeKey
] == null) {
998 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
999 this.checkMessageWorkerId(message
)
1000 if (message
.kill
=== 'success') {
1002 } else if (message
.kill
=== 'failure') {
1005 `Kill message handling failed on worker ${
1006 message.workerId as number
1012 // FIXME: should be registered only once
1013 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1014 this.sendToWorker(workerNodeKey
, { kill
: true })
1019 * Terminates the worker node given its worker node key.
1021 * @param workerNodeKey - The worker node key.
1023 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1024 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1025 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1026 const workerNode
= this.workerNodes
[workerNodeKey
]
1027 await waitWorkerNodeEvents(
1031 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1032 getDefaultTasksQueueOptions(
1033 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1034 ).tasksFinishedTimeout
1036 await this.sendKillMessageToWorker(workerNodeKey
)
1037 await workerNode
.terminate()
1041 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1042 * Can be overridden.
1046 protected setupHook (): void {
1047 /* Intentionally empty */
1051 * Should return whether the worker is the main worker or not.
1053 protected abstract isMain (): boolean
1056 * Hook executed before the worker task execution.
1057 * Can be overridden.
1059 * @param workerNodeKey - The worker node key.
1060 * @param task - The task to execute.
1062 protected beforeTaskExecutionHook (
1063 workerNodeKey
: number,
1066 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1067 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1068 ++workerUsage
.tasks
.executing
1069 updateWaitTimeWorkerUsage(
1070 this.workerChoiceStrategyContext
,
1076 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1077 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1081 const taskFunctionWorkerUsage
= this.workerNodes
[
1083 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1084 ++taskFunctionWorkerUsage
.tasks
.executing
1085 updateWaitTimeWorkerUsage(
1086 this.workerChoiceStrategyContext
,
1087 taskFunctionWorkerUsage
,
1094 * Hook executed after the worker task execution.
1095 * Can be overridden.
1097 * @param workerNodeKey - The worker node key.
1098 * @param message - The received message.
1100 protected afterTaskExecutionHook (
1101 workerNodeKey
: number,
1102 message
: MessageValue
<Response
>
1104 let needWorkerChoiceStrategyUpdate
= false
1105 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1106 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1107 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1108 updateRunTimeWorkerUsage(
1109 this.workerChoiceStrategyContext
,
1113 updateEluWorkerUsage(
1114 this.workerChoiceStrategyContext
,
1118 needWorkerChoiceStrategyUpdate
= true
1121 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1122 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1123 message
.taskPerformance
?.name
as string
1126 const taskFunctionWorkerUsage
= this.workerNodes
[
1128 ].getTaskFunctionWorkerUsage(
1129 message
.taskPerformance
?.name
as string
1131 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1132 updateRunTimeWorkerUsage(
1133 this.workerChoiceStrategyContext
,
1134 taskFunctionWorkerUsage
,
1137 updateEluWorkerUsage(
1138 this.workerChoiceStrategyContext
,
1139 taskFunctionWorkerUsage
,
1142 needWorkerChoiceStrategyUpdate
= true
1144 if (needWorkerChoiceStrategyUpdate
) {
1145 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1150 * Whether the worker node shall update its task function worker usage or not.
1152 * @param workerNodeKey - The worker node key.
1153 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1155 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1156 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1158 workerInfo
!= null &&
1159 Array.isArray(workerInfo
.taskFunctionNames
) &&
1160 workerInfo
.taskFunctionNames
.length
> 2
1165 * Chooses a worker node for the next task.
1167 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1169 * @returns The chosen worker node key
1171 private chooseWorkerNode (): number {
1172 if (this.shallCreateDynamicWorker()) {
1173 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1175 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1177 return workerNodeKey
1180 return this.workerChoiceStrategyContext
.execute()
1184 * Conditions for dynamic worker creation.
1186 * @returns Whether to create a dynamic worker or not.
1188 protected abstract shallCreateDynamicWorker (): boolean
1191 * Sends a message to worker given its worker node key.
1193 * @param workerNodeKey - The worker node key.
1194 * @param message - The message.
1195 * @param transferList - The optional array of transferable objects.
1197 protected abstract sendToWorker (
1198 workerNodeKey
: number,
1199 message
: MessageValue
<Data
>,
1200 transferList
?: TransferListItem
[]
1204 * Creates a new, completely set up worker node.
1206 * @returns New, completely set up worker node key.
1208 protected createAndSetupWorkerNode (): number {
1209 const workerNode
= this.createWorkerNode()
1210 workerNode
.registerWorkerEventHandler(
1212 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1214 workerNode
.registerWorkerEventHandler(
1216 this.opts
.messageHandler
?? EMPTY_FUNCTION
1218 workerNode
.registerWorkerEventHandler(
1220 this.opts
.errorHandler
?? EMPTY_FUNCTION
1222 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1223 workerNode
.info
.ready
= false
1224 this.emitter
?.emit(PoolEvents
.error
, error
)
1228 this.opts
.restartWorkerOnError
=== true
1230 if (workerNode
.info
.dynamic
) {
1231 this.createAndSetupDynamicWorkerNode()
1233 this.createAndSetupWorkerNode()
1239 this.opts
.enableTasksQueue
=== true
1241 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1243 workerNode
?.terminate().catch(error
=> {
1244 this.emitter
?.emit(PoolEvents
.error
, error
)
1247 workerNode
.registerWorkerEventHandler(
1249 this.opts
.exitHandler
?? EMPTY_FUNCTION
1251 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1252 this.removeWorkerNode(workerNode
)
1254 const workerNodeKey
= this.addWorkerNode(workerNode
)
1255 this.afterWorkerNodeSetup(workerNodeKey
)
1256 return workerNodeKey
1260 * Creates a new, completely set up dynamic worker node.
1262 * @returns New, completely set up dynamic worker node key.
1264 protected createAndSetupDynamicWorkerNode (): number {
1265 const workerNodeKey
= this.createAndSetupWorkerNode()
1266 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1267 this.checkMessageWorkerId(message
)
1268 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1271 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1272 // Kill message received from worker
1274 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1275 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1276 ((this.opts
.enableTasksQueue
=== false &&
1277 workerUsage
.tasks
.executing
=== 0) ||
1278 (this.opts
.enableTasksQueue
=== true &&
1279 workerUsage
.tasks
.executing
=== 0 &&
1280 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1282 // Flag the worker node as not ready immediately
1283 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1284 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1285 this.emitter
?.emit(PoolEvents
.error
, error
)
1289 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1290 this.sendToWorker(workerNodeKey
, {
1293 if (this.taskFunctions
.size
> 0) {
1294 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1295 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1296 taskFunctionOperation
: 'add',
1298 taskFunction
: taskFunction
.toString()
1300 this.emitter
?.emit(PoolEvents
.error
, error
)
1304 workerInfo
.dynamic
= true
1306 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1307 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1309 workerInfo
.ready
= true
1311 this.checkAndEmitDynamicWorkerCreationEvents()
1312 return workerNodeKey
1316 * Registers a listener callback on the worker given its worker node key.
1318 * @param workerNodeKey - The worker node key.
1319 * @param listener - The message listener callback.
1321 protected abstract registerWorkerMessageListener
<
1322 Message
extends Data
| Response
1324 workerNodeKey
: number,
1325 listener
: (message
: MessageValue
<Message
>) => void
1329 * Registers once a listener callback on the worker given its worker node key.
1331 * @param workerNodeKey - The worker node key.
1332 * @param listener - The message listener callback.
1334 protected abstract registerOnceWorkerMessageListener
<
1335 Message
extends Data
| Response
1337 workerNodeKey
: number,
1338 listener
: (message
: MessageValue
<Message
>) => void
1342 * Deregisters a listener callback on the worker given its worker node key.
1344 * @param workerNodeKey - The worker node key.
1345 * @param listener - The message listener callback.
1347 protected abstract deregisterWorkerMessageListener
<
1348 Message
extends Data
| Response
1350 workerNodeKey
: number,
1351 listener
: (message
: MessageValue
<Message
>) => void
1355 * Method hooked up after a worker node has been newly created.
1356 * Can be overridden.
1358 * @param workerNodeKey - The newly created worker node key.
1360 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1361 // Listen to worker messages.
1362 this.registerWorkerMessageListener(
1364 this.workerMessageListener
1366 // Send the startup message to worker.
1367 this.sendStartupMessageToWorker(workerNodeKey
)
1368 // Send the statistics message to worker.
1369 this.sendStatisticsMessageToWorker(workerNodeKey
)
1370 if (this.opts
.enableTasksQueue
=== true) {
1371 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1372 this.workerNodes
[workerNodeKey
].on(
1374 this.handleIdleWorkerNodeEvent
1377 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1378 this.workerNodes
[workerNodeKey
].on(
1380 this.handleBackPressureEvent
1387 * Sends the startup message to worker given its worker node key.
1389 * @param workerNodeKey - The worker node key.
1391 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1394 * Sends the statistics message to worker given its worker node key.
1396 * @param workerNodeKey - The worker node key.
1398 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1399 this.sendToWorker(workerNodeKey
, {
1402 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1404 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1410 private cannotStealTask (): boolean {
1411 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1414 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1415 if (this.shallExecuteTask(workerNodeKey
)) {
1416 this.executeTask(workerNodeKey
, task
)
1418 this.enqueueTask(workerNodeKey
, task
)
1422 private redistributeQueuedTasks (workerNodeKey
: number): void {
1423 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1426 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1427 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1428 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1429 return workerNode
.info
.ready
&&
1430 workerNode
.usage
.tasks
.queued
<
1431 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1438 destinationWorkerNodeKey
,
1439 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1444 private updateTaskStolenStatisticsWorkerUsage (
1445 workerNodeKey
: number,
1448 const workerNode
= this.workerNodes
[workerNodeKey
]
1449 if (workerNode
?.usage
!= null) {
1450 ++workerNode
.usage
.tasks
.stolen
1453 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1454 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1456 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1459 ++taskFunctionWorkerUsage
.tasks
.stolen
1463 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1464 workerNodeKey
: number
1466 const workerNode
= this.workerNodes
[workerNodeKey
]
1467 if (workerNode
?.usage
!= null) {
1468 ++workerNode
.usage
.tasks
.sequentiallyStolen
1472 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1473 workerNodeKey
: number,
1476 const workerNode
= this.workerNodes
[workerNodeKey
]
1478 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1479 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1481 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1484 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1488 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1489 workerNodeKey
: number
1491 const workerNode
= this.workerNodes
[workerNodeKey
]
1492 if (workerNode
?.usage
!= null) {
1493 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1497 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1498 workerNodeKey
: number,
1501 const workerNode
= this.workerNodes
[workerNodeKey
]
1503 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1504 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1506 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1509 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1513 private readonly handleIdleWorkerNodeEvent
= (
1514 eventDetail
: WorkerNodeEventDetail
,
1515 previousStolenTask
?: Task
<Data
>
1517 const { workerNodeKey
} = eventDetail
1518 if (workerNodeKey
== null) {
1520 'WorkerNode event detail workerNodeKey property must be defined'
1524 this.cannotStealTask() ||
1525 (this.info
.stealingWorkerNodes
as number) >
1526 Math.floor(this.workerNodes
.length
/ 2)
1528 if (previousStolenTask
!= null) {
1529 this.getWorkerInfo(workerNodeKey
).stealing
= false
1533 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1535 previousStolenTask
!= null &&
1536 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1537 (workerNodeTasksUsage
.executing
> 0 ||
1538 this.tasksQueueSize(workerNodeKey
) > 0)
1540 this.getWorkerInfo(workerNodeKey
).stealing
= false
1541 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1542 .taskFunctionNames
as string[]) {
1543 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1548 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1551 this.getWorkerInfo(workerNodeKey
).stealing
= true
1552 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1554 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1557 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1559 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1560 ?.tasks
as TaskStatistics
1562 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1563 (previousStolenTask
!= null &&
1564 previousStolenTask
.name
=== stolenTask
.name
&&
1565 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1567 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1569 stolenTask
.name
as string
1572 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1574 stolenTask
.name
as string
1578 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1580 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1583 .catch(EMPTY_FUNCTION
)
1586 private readonly workerNodeStealTask
= (
1587 workerNodeKey
: number
1588 ): Task
<Data
> | undefined => {
1589 const workerNodes
= this.workerNodes
1592 (workerNodeA
, workerNodeB
) =>
1593 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1595 const sourceWorkerNode
= workerNodes
.find(
1596 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1597 sourceWorkerNode
.info
.ready
&&
1598 !sourceWorkerNode
.info
.stealing
&&
1599 sourceWorkerNodeKey
!== workerNodeKey
&&
1600 sourceWorkerNode
.usage
.tasks
.queued
> 0
1602 if (sourceWorkerNode
!= null) {
1603 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1604 this.handleTask(workerNodeKey
, task
)
1605 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1606 this.updateTaskStolenStatisticsWorkerUsage(
1614 private readonly handleBackPressureEvent
= (
1615 eventDetail
: WorkerNodeEventDetail
1618 this.cannotStealTask() ||
1619 (this.info
.stealingWorkerNodes
as number) >
1620 Math.floor(this.workerNodes
.length
/ 2)
1624 const { workerId
} = eventDetail
1625 const sizeOffset
= 1
1626 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1629 const sourceWorkerNode
=
1630 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1631 const workerNodes
= this.workerNodes
1634 (workerNodeA
, workerNodeB
) =>
1635 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1637 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1639 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1640 workerNode
.info
.ready
&&
1641 !workerNode
.info
.stealing
&&
1642 workerNode
.info
.id
!== workerId
&&
1643 workerNode
.usage
.tasks
.queued
<
1644 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1646 this.getWorkerInfo(workerNodeKey
).stealing
= true
1647 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1648 this.handleTask(workerNodeKey
, task
)
1649 this.updateTaskStolenStatisticsWorkerUsage(
1653 this.getWorkerInfo(workerNodeKey
).stealing
= false
1659 * This method is the message listener registered on each worker.
1661 protected readonly workerMessageListener
= (
1662 message
: MessageValue
<Response
>
1664 this.checkMessageWorkerId(message
)
1665 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1666 if (ready
!= null && taskFunctionNames
!= null) {
1667 // Worker ready response received from worker
1668 this.handleWorkerReadyResponse(message
)
1669 } else if (taskId
!= null) {
1670 // Task execution response received from worker
1671 this.handleTaskExecutionResponse(message
)
1672 } else if (taskFunctionNames
!= null) {
1673 // Task function names message received from worker
1675 this.getWorkerNodeKeyByWorkerId(workerId
)
1676 ).taskFunctionNames
= taskFunctionNames
1680 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1681 const { workerId
, ready
, taskFunctionNames
} = message
1682 if (ready
=== false) {
1683 throw new Error(`Worker ${workerId as number} failed to initialize`)
1685 const workerInfo
= this.getWorkerInfo(
1686 this.getWorkerNodeKeyByWorkerId(workerId
)
1688 workerInfo
.ready
= ready
as boolean
1689 workerInfo
.taskFunctionNames
= taskFunctionNames
1690 if (!this.readyEventEmitted
&& this.ready
) {
1691 this.readyEventEmitted
= true
1692 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1696 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1697 const { workerId
, taskId
, workerError
, data
} = message
1698 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1699 if (promiseResponse
!= null) {
1700 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1701 const workerNode
= this.workerNodes
[workerNodeKey
]
1702 if (workerError
!= null) {
1703 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1704 asyncResource
!= null
1705 ? asyncResource
.runInAsyncScope(
1710 : reject(workerError
.message
)
1712 asyncResource
!= null
1713 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1714 : resolve(data
as Response
)
1716 asyncResource
?.emitDestroy()
1717 this.afterTaskExecutionHook(workerNodeKey
, message
)
1718 this.promiseResponseMap
.delete(taskId
as string)
1719 workerNode
?.emit('taskFinished', taskId
)
1720 if (this.opts
.enableTasksQueue
=== true && !this.destroying
) {
1721 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1723 this.tasksQueueSize(workerNodeKey
) > 0 &&
1724 workerNodeTasksUsage
.executing
<
1725 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1729 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1733 workerNodeTasksUsage
.executing
=== 0 &&
1734 this.tasksQueueSize(workerNodeKey
) === 0 &&
1735 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1737 workerNode
.emit('idleWorkerNode', {
1738 workerId
: workerId
as number,
1746 private checkAndEmitTaskExecutionEvents (): void {
1748 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1752 private checkAndEmitTaskQueuingEvents (): void {
1753 if (this.hasBackPressure()) {
1754 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1759 * Emits dynamic worker creation events.
1761 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1764 * Gets the worker information given its worker node key.
1766 * @param workerNodeKey - The worker node key.
1767 * @returns The worker information.
1769 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1770 return this.workerNodes
[workerNodeKey
]?.info
1774 * Creates a worker node.
1776 * @returns The created worker node.
1778 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1779 const workerNode
= new WorkerNode
<Worker
, Data
>(
1784 workerOptions
: this.opts
.workerOptions
,
1785 tasksQueueBackPressureSize
:
1786 this.opts
.tasksQueueOptions
?.size
??
1787 getDefaultTasksQueueOptions(
1788 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1792 // Flag the worker node as ready at pool startup.
1793 if (this.starting
) {
1794 workerNode
.info
.ready
= true
1800 * Adds the given worker node in the pool worker nodes.
1802 * @param workerNode - The worker node.
1803 * @returns The added worker node key.
1804 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1806 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1807 this.workerNodes
.push(workerNode
)
1808 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1809 if (workerNodeKey
=== -1) {
1810 throw new Error('Worker added not found in worker nodes')
1812 return workerNodeKey
1816 * Removes the worker node from the pool worker nodes.
1818 * @param workerNode - The worker node.
1820 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1821 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1822 if (workerNodeKey
!== -1) {
1823 this.workerNodes
.splice(workerNodeKey
, 1)
1824 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1828 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1829 this.getWorkerInfo(workerNodeKey
).ready
= false
1833 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1835 this.opts
.enableTasksQueue
=== true &&
1836 this.workerNodes
[workerNodeKey
].hasBackPressure()
1840 private hasBackPressure (): boolean {
1842 this.opts
.enableTasksQueue
=== true &&
1843 this.workerNodes
.findIndex(
1844 workerNode
=> !workerNode
.hasBackPressure()
1850 * Executes the given task on the worker given its worker node key.
1852 * @param workerNodeKey - The worker node key.
1853 * @param task - The task to execute.
1855 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1856 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1857 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1858 this.checkAndEmitTaskExecutionEvents()
1861 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1862 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1863 this.checkAndEmitTaskQueuingEvents()
1864 return tasksQueueSize
1867 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1868 return this.workerNodes
[workerNodeKey
].dequeueTask()
1871 private tasksQueueSize (workerNodeKey
: number): number {
1872 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1875 protected flushTasksQueue (workerNodeKey
: number): number {
1876 let flushedTasks
= 0
1877 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1880 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1884 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1888 private flushTasksQueues (): void {
1889 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1890 this.flushTasksQueue(workerNodeKey
)