1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types'
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
25 import { KillBehaviors
} from
'../worker/worker-options'
26 import type { TaskFunction
} from
'../worker/task-functions'
34 type TasksQueueOptions
41 WorkerNodeEventDetail
,
46 type MeasurementStatisticsRequirements
,
48 WorkerChoiceStrategies
,
49 type WorkerChoiceStrategy
,
50 type WorkerChoiceStrategyOptions
51 } from
'./selection-strategies/selection-strategies-types'
52 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
53 import { version
} from
'./version'
54 import { WorkerNode
} from
'./worker-node'
57 checkValidTasksQueueOptions
,
58 checkValidWorkerChoiceStrategy
,
59 updateMeasurementStatistics
60 // waitWorkerNodeEvents
64 * Base class that implements some shared logic for all poolifier pools.
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 export abstract class AbstractPool
<
71 Worker
extends IWorker
,
74 > implements IPool
<Worker
, Data
, Response
> {
76 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
79 public emitter
?: EventEmitterAsyncResource
82 * Dynamic pool maximum size property placeholder.
84 protected readonly max
?: number
87 * The task execution response promise map:
88 * - `key`: The message id of each submitted task.
89 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
91 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
93 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
94 new Map
<string, PromiseResponseWrapper
<Response
>>()
97 * Worker choice strategy context referencing a worker choice algorithm implementation.
99 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
106 * The task functions added at runtime map:
107 * - `key`: The task function name.
108 * - `value`: The task function itself.
110 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
113 * Whether the pool is started or not.
115 private started
: boolean
117 * Whether the pool is starting or not.
119 private starting
: boolean
121 * Whether the pool is destroying or not.
123 private destroying
: boolean
125 * Whether the pool ready event has been emitted or not.
127 private readyEventEmitted
: boolean
129 * The start timestamp of the pool.
131 private readonly startTimestamp
134 * Constructs a new poolifier pool.
136 * @param numberOfWorkers - Number of workers that this pool should manage.
137 * @param filePath - Path to the worker file.
138 * @param opts - Options for the pool.
141 protected readonly numberOfWorkers
: number,
142 protected readonly filePath
: string,
143 protected readonly opts
: PoolOptions
<Worker
>
145 if (!this.isMain()) {
147 'Cannot start a pool from a worker with the same type as the pool'
150 checkFilePath(this.filePath
)
151 this.checkNumberOfWorkers(this.numberOfWorkers
)
152 this.checkPoolOptions(this.opts
)
154 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
155 this.executeTask
= this.executeTask
.bind(this)
156 this.enqueueTask
= this.enqueueTask
.bind(this)
158 if (this.opts
.enableEvents
=== true) {
159 this.initializeEventEmitter()
161 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
167 this.opts
.workerChoiceStrategy
,
168 this.opts
.workerChoiceStrategyOptions
173 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
176 this.starting
= false
177 this.destroying
= false
178 this.readyEventEmitted
= false
179 if (this.opts
.startWorkers
=== true) {
183 this.startTimestamp
= performance
.now()
186 private checkNumberOfWorkers (numberOfWorkers
: number): void {
187 if (numberOfWorkers
== null) {
189 'Cannot instantiate a pool without specifying the number of workers'
191 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
193 'Cannot instantiate a pool with a non safe integer number of workers'
195 } else if (numberOfWorkers
< 0) {
196 throw new RangeError(
197 'Cannot instantiate a pool with a negative number of workers'
199 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
200 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
204 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
205 if (isPlainObject(opts
)) {
206 this.opts
.startWorkers
= opts
.startWorkers
?? true
207 checkValidWorkerChoiceStrategy(
208 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
210 this.opts
.workerChoiceStrategy
=
211 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
212 this.checkValidWorkerChoiceStrategyOptions(
213 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
215 this.opts
.workerChoiceStrategyOptions
= {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
217 ...opts
.workerChoiceStrategyOptions
219 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
220 this.opts
.enableEvents
= opts
.enableEvents
?? true
221 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
222 if (this.opts
.enableTasksQueue
) {
223 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
224 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
225 opts
.tasksQueueOptions
as TasksQueueOptions
229 throw new TypeError('Invalid pool options: must be a plain object')
233 private checkValidWorkerChoiceStrategyOptions (
234 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
237 workerChoiceStrategyOptions
!= null &&
238 !isPlainObject(workerChoiceStrategyOptions
)
241 'Invalid worker choice strategy options: must be a plain object'
245 workerChoiceStrategyOptions
?.retries
!= null &&
246 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
249 'Invalid worker choice strategy options: retries must be an integer'
253 workerChoiceStrategyOptions
?.retries
!= null &&
254 workerChoiceStrategyOptions
.retries
< 0
256 throw new RangeError(
257 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
261 workerChoiceStrategyOptions
?.weights
!= null &&
262 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
265 'Invalid worker choice strategy options: must have a weight for each worker node'
269 workerChoiceStrategyOptions
?.measurement
!= null &&
270 !Object.values(Measurements
).includes(
271 workerChoiceStrategyOptions
.measurement
275 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
280 private initializeEventEmitter (): void {
281 this.emitter
= new EventEmitterAsyncResource({
282 name
: `poolifier:${this.type}-${this.worker}-pool`
287 public get
info (): PoolInfo
{
292 started
: this.started
,
294 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
295 minSize
: this.minSize
,
296 maxSize
: this.maxSize
,
297 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 .runTime
.aggregate
&&
299 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
300 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
301 workerNodes
: this.workerNodes
.length
,
302 idleWorkerNodes
: this.workerNodes
.reduce(
303 (accumulator
, workerNode
) =>
304 workerNode
.usage
.tasks
.executing
=== 0
309 busyWorkerNodes
: this.workerNodes
.reduce(
310 (accumulator
, _workerNode
, workerNodeKey
) =>
311 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
314 executedTasks
: this.workerNodes
.reduce(
315 (accumulator
, workerNode
) =>
316 accumulator
+ workerNode
.usage
.tasks
.executed
,
319 executingTasks
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 accumulator
+ workerNode
.usage
.tasks
.executing
,
324 ...(this.opts
.enableTasksQueue
=== true && {
325 queuedTasks
: this.workerNodes
.reduce(
326 (accumulator
, workerNode
) =>
327 accumulator
+ workerNode
.usage
.tasks
.queued
,
331 ...(this.opts
.enableTasksQueue
=== true && {
332 maxQueuedTasks
: this.workerNodes
.reduce(
333 (accumulator
, workerNode
) =>
334 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
338 ...(this.opts
.enableTasksQueue
=== true && {
339 backPressure
: this.hasBackPressure()
341 ...(this.opts
.enableTasksQueue
=== true && {
342 stolenTasks
: this.workerNodes
.reduce(
343 (accumulator
, workerNode
) =>
344 accumulator
+ workerNode
.usage
.tasks
.stolen
,
348 failedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ workerNode
.usage
.tasks
.failed
,
353 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
354 .runTime
.aggregate
&& {
358 ...this.workerNodes
.map(
359 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
365 ...this.workerNodes
.map(
366 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
370 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 .runTime
.average
&& {
374 this.workerNodes
.reduce
<number[]>(
375 (accumulator
, workerNode
) =>
376 accumulator
.concat(workerNode
.usage
.runTime
.history
),
382 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
386 this.workerNodes
.reduce
<number[]>(
387 (accumulator
, workerNode
) =>
388 accumulator
.concat(workerNode
.usage
.runTime
.history
),
396 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
397 .waitTime
.aggregate
&& {
401 ...this.workerNodes
.map(
402 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
408 ...this.workerNodes
.map(
409 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
413 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
414 .waitTime
.average
&& {
417 this.workerNodes
.reduce
<number[]>(
418 (accumulator
, workerNode
) =>
419 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
425 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
426 .waitTime
.median
&& {
429 this.workerNodes
.reduce
<number[]>(
430 (accumulator
, workerNode
) =>
431 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
443 * The pool readiness boolean status.
445 private get
ready (): boolean {
447 this.workerNodes
.reduce(
448 (accumulator
, workerNode
) =>
449 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
458 * The approximate pool utilization.
460 * @returns The pool utilization.
462 private get
utilization (): number {
463 const poolTimeCapacity
=
464 (performance
.now() - this.startTimestamp
) * this.maxSize
465 const totalTasksRunTime
= this.workerNodes
.reduce(
466 (accumulator
, workerNode
) =>
467 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
470 const totalTasksWaitTime
= this.workerNodes
.reduce(
471 (accumulator
, workerNode
) =>
472 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
475 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
481 * If it is `'dynamic'`, it provides the `max` property.
483 protected abstract get
type (): PoolType
488 protected abstract get
worker (): WorkerType
491 * The pool minimum size.
493 protected get
minSize (): number {
494 return this.numberOfWorkers
498 * The pool maximum size.
500 protected get
maxSize (): number {
501 return this.max
?? this.numberOfWorkers
505 * Checks if the worker id sent in the received message from a worker is valid.
507 * @param message - The received message.
508 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
510 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
511 if (message
.workerId
== null) {
512 throw new Error('Worker message received without worker id')
513 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
515 `Worker message received from unknown worker '${message.workerId}'`
521 * Gets the given worker its worker node key.
523 * @param worker - The worker.
524 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
526 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
527 return this.workerNodes
.findIndex(
528 workerNode
=> workerNode
.worker
=== worker
533 * Gets the worker node key given its worker id.
535 * @param workerId - The worker id.
536 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
538 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
539 return this.workerNodes
.findIndex(
540 workerNode
=> workerNode
.info
.id
=== workerId
545 public setWorkerChoiceStrategy (
546 workerChoiceStrategy
: WorkerChoiceStrategy
,
547 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
549 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
550 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
551 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
552 this.opts
.workerChoiceStrategy
554 if (workerChoiceStrategyOptions
!= null) {
555 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
557 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
558 workerNode
.resetUsage()
559 this.sendStatisticsMessageToWorker(workerNodeKey
)
564 public setWorkerChoiceStrategyOptions (
565 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
567 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
568 this.opts
.workerChoiceStrategyOptions
= {
569 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
570 ...workerChoiceStrategyOptions
572 this.workerChoiceStrategyContext
.setOptions(
573 this.opts
.workerChoiceStrategyOptions
578 public enableTasksQueue (
580 tasksQueueOptions
?: TasksQueueOptions
582 if (this.opts
.enableTasksQueue
=== true && !enable
) {
583 this.unsetTaskStealing()
584 this.unsetTasksStealingOnBackPressure()
585 this.flushTasksQueues()
587 this.opts
.enableTasksQueue
= enable
588 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
592 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
593 if (this.opts
.enableTasksQueue
=== true) {
594 checkValidTasksQueueOptions(tasksQueueOptions
)
595 this.opts
.tasksQueueOptions
=
596 this.buildTasksQueueOptions(tasksQueueOptions
)
597 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
598 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
599 this.unsetTaskStealing()
600 this.setTaskStealing()
602 this.unsetTaskStealing()
604 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
605 this.unsetTasksStealingOnBackPressure()
606 this.setTasksStealingOnBackPressure()
608 this.unsetTasksStealingOnBackPressure()
610 } else if (this.opts
.tasksQueueOptions
!= null) {
611 delete this.opts
.tasksQueueOptions
615 private buildTasksQueueOptions (
616 tasksQueueOptions
: TasksQueueOptions
617 ): TasksQueueOptions
{
620 size
: Math.pow(this.maxSize
, 2),
623 tasksStealingOnBackPressure
: true
629 private setTasksQueueSize (size
: number): void {
630 for (const workerNode
of this.workerNodes
) {
631 workerNode
.tasksQueueBackPressureSize
= size
635 private setTaskStealing (): void {
636 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
637 this.workerNodes
[workerNodeKey
].on(
639 this.handleIdleWorkerNodeEvent
644 private unsetTaskStealing (): void {
645 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
646 this.workerNodes
[workerNodeKey
].off(
648 this.handleIdleWorkerNodeEvent
653 private setTasksStealingOnBackPressure (): void {
654 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
655 this.workerNodes
[workerNodeKey
].on(
657 this.handleBackPressureEvent
662 private unsetTasksStealingOnBackPressure (): void {
663 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
664 this.workerNodes
[workerNodeKey
].off(
666 this.handleBackPressureEvent
672 * Whether the pool is full or not.
674 * The pool filling boolean status.
676 protected get
full (): boolean {
677 return this.workerNodes
.length
>= this.maxSize
681 * Whether the pool is busy or not.
683 * The pool busyness boolean status.
685 protected abstract get
busy (): boolean
688 * Whether worker nodes are executing concurrently their tasks quota or not.
690 * @returns Worker nodes busyness boolean status.
692 protected internalBusy (): boolean {
693 if (this.opts
.enableTasksQueue
=== true) {
695 this.workerNodes
.findIndex(
697 workerNode
.info
.ready
&&
698 workerNode
.usage
.tasks
.executing
<
699 (this.opts
.tasksQueueOptions
?.concurrency
as number)
704 this.workerNodes
.findIndex(
706 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
711 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
712 if (this.opts
.enableTasksQueue
=== true) {
714 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
715 (this.opts
.tasksQueueOptions
?.concurrency
as number)
718 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
721 private async sendTaskFunctionOperationToWorker (
722 workerNodeKey
: number,
723 message
: MessageValue
<Data
>
724 ): Promise
<boolean> {
725 return await new Promise
<boolean>((resolve
, reject
) => {
726 const taskFunctionOperationListener
= (
727 message
: MessageValue
<Response
>
729 this.checkMessageWorkerId(message
)
730 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
732 message
.taskFunctionOperationStatus
!= null &&
733 message
.workerId
=== workerId
735 if (message
.taskFunctionOperationStatus
) {
737 } else if (!message
.taskFunctionOperationStatus
) {
740 `Task function operation '${
741 message.taskFunctionOperation as string
742 }' failed on worker ${message.workerId} with error: '${
743 message.workerError?.message as string
748 this.deregisterWorkerMessageListener(
749 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
750 taskFunctionOperationListener
754 this.registerWorkerMessageListener(
756 taskFunctionOperationListener
758 this.sendToWorker(workerNodeKey
, message
)
762 private async sendTaskFunctionOperationToWorkers (
763 message
: MessageValue
<Data
>
764 ): Promise
<boolean> {
765 return await new Promise
<boolean>((resolve
, reject
) => {
766 const responsesReceived
= new Array<MessageValue
<Response
>>()
767 const taskFunctionOperationsListener
= (
768 message
: MessageValue
<Response
>
770 this.checkMessageWorkerId(message
)
771 if (message
.taskFunctionOperationStatus
!= null) {
772 responsesReceived
.push(message
)
773 if (responsesReceived
.length
=== this.workerNodes
.length
) {
775 responsesReceived
.every(
776 message
=> message
.taskFunctionOperationStatus
=== true
781 responsesReceived
.some(
782 message
=> message
.taskFunctionOperationStatus
=== false
785 const errorResponse
= responsesReceived
.find(
786 response
=> response
.taskFunctionOperationStatus
=== false
790 `Task function operation '${
791 message.taskFunctionOperation as string
792 }' failed on worker ${
793 errorResponse?.workerId as number
795 errorResponse?.workerError?.message as string
800 this.deregisterWorkerMessageListener(
801 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
802 taskFunctionOperationsListener
807 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
808 this.registerWorkerMessageListener(
810 taskFunctionOperationsListener
812 this.sendToWorker(workerNodeKey
, message
)
818 public hasTaskFunction (name
: string): boolean {
819 for (const workerNode
of this.workerNodes
) {
821 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
822 workerNode
.info
.taskFunctionNames
.includes(name
)
831 public async addTaskFunction (
833 fn
: TaskFunction
<Data
, Response
>
834 ): Promise
<boolean> {
835 if (typeof name
!== 'string') {
836 throw new TypeError('name argument must be a string')
838 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
839 throw new TypeError('name argument must not be an empty string')
841 if (typeof fn
!== 'function') {
842 throw new TypeError('fn argument must be a function')
844 const opResult
= await this.sendTaskFunctionOperationToWorkers({
845 taskFunctionOperation
: 'add',
846 taskFunctionName
: name
,
847 taskFunction
: fn
.toString()
849 this.taskFunctions
.set(name
, fn
)
854 public async removeTaskFunction (name
: string): Promise
<boolean> {
855 if (!this.taskFunctions
.has(name
)) {
857 'Cannot remove a task function not handled on the pool side'
860 const opResult
= await this.sendTaskFunctionOperationToWorkers({
861 taskFunctionOperation
: 'remove',
862 taskFunctionName
: name
864 this.deleteTaskFunctionWorkerUsages(name
)
865 this.taskFunctions
.delete(name
)
870 public listTaskFunctionNames (): string[] {
871 for (const workerNode
of this.workerNodes
) {
873 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
874 workerNode
.info
.taskFunctionNames
.length
> 0
876 return workerNode
.info
.taskFunctionNames
883 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
884 return await this.sendTaskFunctionOperationToWorkers({
885 taskFunctionOperation
: 'default',
886 taskFunctionName
: name
890 private deleteTaskFunctionWorkerUsages (name
: string): void {
891 for (const workerNode
of this.workerNodes
) {
892 workerNode
.deleteTaskFunctionWorkerUsage(name
)
896 private shallExecuteTask (workerNodeKey
: number): boolean {
898 this.tasksQueueSize(workerNodeKey
) === 0 &&
899 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
900 (this.opts
.tasksQueueOptions
?.concurrency
as number)
905 public async execute (
908 transferList
?: TransferListItem
[]
909 ): Promise
<Response
> {
910 return await new Promise
<Response
>((resolve
, reject
) => {
912 reject(new Error('Cannot execute a task on not started pool'))
915 if (this.destroying
) {
916 reject(new Error('Cannot execute a task on destroying pool'))
919 if (name
!= null && typeof name
!== 'string') {
920 reject(new TypeError('name argument must be a string'))
925 typeof name
=== 'string' &&
926 name
.trim().length
=== 0
928 reject(new TypeError('name argument must not be an empty string'))
931 if (transferList
!= null && !Array.isArray(transferList
)) {
932 reject(new TypeError('transferList argument must be an array'))
935 const timestamp
= performance
.now()
936 const workerNodeKey
= this.chooseWorkerNode()
937 const task
: Task
<Data
> = {
938 name
: name
?? DEFAULT_TASK_NAME
,
939 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
940 data
: data
?? ({} as Data
),
945 this.promiseResponseMap
.set(task
.taskId
as string, {
949 ...(this.emitter
!= null && {
950 asyncResource
: new AsyncResource('poolifier:task', {
951 triggerAsyncId
: this.emitter
.asyncId
,
952 requireManualDestroy
: true
957 this.opts
.enableTasksQueue
=== false ||
958 (this.opts
.enableTasksQueue
=== true &&
959 this.shallExecuteTask(workerNodeKey
))
961 this.executeTask(workerNodeKey
, task
)
963 this.enqueueTask(workerNodeKey
, task
)
969 public start (): void {
971 throw new Error('Cannot start an already started pool')
974 throw new Error('Cannot start an already starting pool')
976 if (this.destroying
) {
977 throw new Error('Cannot start a destroying pool')
981 this.workerNodes
.reduce(
982 (accumulator
, workerNode
) =>
983 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
985 ) < this.numberOfWorkers
987 this.createAndSetupWorkerNode()
989 this.starting
= false
994 public async destroy (): Promise
<void> {
996 throw new Error('Cannot destroy an already destroyed pool')
999 throw new Error('Cannot destroy an starting pool')
1001 if (this.destroying
) {
1002 throw new Error('Cannot destroy an already destroying pool')
1004 this.destroying
= true
1006 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1007 await this.destroyWorkerNode(workerNodeKey
)
1010 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1011 this.emitter
?.emitDestroy()
1012 this.emitter
?.removeAllListeners()
1013 this.readyEventEmitted
= false
1014 this.destroying
= false
1015 this.started
= false
1018 protected async sendKillMessageToWorker (
1019 workerNodeKey
: number
1021 await new Promise
<void>((resolve
, reject
) => {
1022 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1023 this.checkMessageWorkerId(message
)
1024 if (message
.kill
=== 'success') {
1026 } else if (message
.kill
=== 'failure') {
1029 `Kill message handling failed on worker ${
1030 message.workerId as number
1036 // FIXME: should be registered only once
1037 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1038 this.sendToWorker(workerNodeKey
, { kill
: true })
1043 * Terminates the worker node given its worker node key.
1045 * @param workerNodeKey - The worker node key.
1047 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1048 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1049 this.flushTasksQueue(workerNodeKey
)
1050 const workerNode
= this.workerNodes
[workerNodeKey
]
1051 // FIXME: wait for tasks to be finished
1052 // await waitWorkerNodeEvents(
1055 // workerNode.usage.tasks.executing
1057 await this.sendKillMessageToWorker(workerNodeKey
)
1058 await workerNode
.terminate()
1062 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1063 * Can be overridden.
1067 protected setupHook (): void {
1068 /* Intentionally empty */
1072 * Should return whether the worker is the main worker or not.
1074 protected abstract isMain (): boolean
1077 * Hook executed before the worker task execution.
1078 * Can be overridden.
1080 * @param workerNodeKey - The worker node key.
1081 * @param task - The task to execute.
1083 protected beforeTaskExecutionHook (
1084 workerNodeKey
: number,
1087 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1088 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1089 ++workerUsage
.tasks
.executing
1090 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1093 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1094 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1098 const taskFunctionWorkerUsage
= this.workerNodes
[
1100 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1101 ++taskFunctionWorkerUsage
.tasks
.executing
1102 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1107 * Hook executed after the worker task execution.
1108 * Can be overridden.
1110 * @param workerNodeKey - The worker node key.
1111 * @param message - The received message.
1113 protected afterTaskExecutionHook (
1114 workerNodeKey
: number,
1115 message
: MessageValue
<Response
>
1117 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1118 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1119 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1120 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1121 this.updateEluWorkerUsage(workerUsage
, message
)
1124 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1125 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1126 message
.taskPerformance
?.name
as string
1129 const taskFunctionWorkerUsage
= this.workerNodes
[
1131 ].getTaskFunctionWorkerUsage(
1132 message
.taskPerformance
?.name
as string
1134 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1135 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1136 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1141 * Whether the worker node shall update its task function worker usage or not.
1143 * @param workerNodeKey - The worker node key.
1144 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1146 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1147 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1149 workerInfo
!= null &&
1150 Array.isArray(workerInfo
.taskFunctionNames
) &&
1151 workerInfo
.taskFunctionNames
.length
> 2
1155 private updateTaskStatisticsWorkerUsage (
1156 workerUsage
: WorkerUsage
,
1157 message
: MessageValue
<Response
>
1159 const workerTaskStatistics
= workerUsage
.tasks
1161 workerTaskStatistics
.executing
!= null &&
1162 workerTaskStatistics
.executing
> 0
1164 --workerTaskStatistics
.executing
1166 if (message
.workerError
== null) {
1167 ++workerTaskStatistics
.executed
1169 ++workerTaskStatistics
.failed
1173 private updateRunTimeWorkerUsage (
1174 workerUsage
: WorkerUsage
,
1175 message
: MessageValue
<Response
>
1177 if (message
.workerError
!= null) {
1180 updateMeasurementStatistics(
1181 workerUsage
.runTime
,
1182 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1183 message
.taskPerformance
?.runTime
?? 0
1187 private updateWaitTimeWorkerUsage (
1188 workerUsage
: WorkerUsage
,
1191 const timestamp
= performance
.now()
1192 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1193 updateMeasurementStatistics(
1194 workerUsage
.waitTime
,
1195 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1200 private updateEluWorkerUsage (
1201 workerUsage
: WorkerUsage
,
1202 message
: MessageValue
<Response
>
1204 if (message
.workerError
!= null) {
1207 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1208 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1209 updateMeasurementStatistics(
1210 workerUsage
.elu
.active
,
1211 eluTaskStatisticsRequirements
,
1212 message
.taskPerformance
?.elu
?.active
?? 0
1214 updateMeasurementStatistics(
1215 workerUsage
.elu
.idle
,
1216 eluTaskStatisticsRequirements
,
1217 message
.taskPerformance
?.elu
?.idle
?? 0
1219 if (eluTaskStatisticsRequirements
.aggregate
) {
1220 if (message
.taskPerformance
?.elu
!= null) {
1221 if (workerUsage
.elu
.utilization
!= null) {
1222 workerUsage
.elu
.utilization
=
1223 (workerUsage
.elu
.utilization
+
1224 message
.taskPerformance
.elu
.utilization
) /
1227 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1234 * Chooses a worker node for the next task.
1236 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1238 * @returns The chosen worker node key
1240 private chooseWorkerNode (): number {
1241 if (this.shallCreateDynamicWorker()) {
1242 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1244 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1246 return workerNodeKey
1249 return this.workerChoiceStrategyContext
.execute()
1253 * Conditions for dynamic worker creation.
1255 * @returns Whether to create a dynamic worker or not.
1257 private shallCreateDynamicWorker (): boolean {
1258 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1262 * Sends a message to worker given its worker node key.
1264 * @param workerNodeKey - The worker node key.
1265 * @param message - The message.
1266 * @param transferList - The optional array of transferable objects.
1268 protected abstract sendToWorker (
1269 workerNodeKey
: number,
1270 message
: MessageValue
<Data
>,
1271 transferList
?: TransferListItem
[]
1275 * Creates a new, completely set up worker node.
1277 * @returns New, completely set up worker node key.
1279 protected createAndSetupWorkerNode (): number {
1280 const workerNode
= this.createWorkerNode()
1281 workerNode
.registerWorkerEventHandler(
1283 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1285 workerNode
.registerWorkerEventHandler(
1287 this.opts
.messageHandler
?? EMPTY_FUNCTION
1289 workerNode
.registerWorkerEventHandler(
1291 this.opts
.errorHandler
?? EMPTY_FUNCTION
1293 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1294 workerNode
.info
.ready
= false
1295 this.emitter
?.emit(PoolEvents
.error
, error
)
1300 this.opts
.restartWorkerOnError
=== true
1302 if (workerNode
.info
.dynamic
) {
1303 this.createAndSetupDynamicWorkerNode()
1305 this.createAndSetupWorkerNode()
1308 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1309 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1311 workerNode
.terminate().catch(error
=> {
1312 this.emitter
?.emit(PoolEvents
.error
, error
)
1315 workerNode
.registerWorkerEventHandler(
1317 this.opts
.exitHandler
?? EMPTY_FUNCTION
1319 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1320 this.removeWorkerNode(workerNode
)
1322 const workerNodeKey
= this.addWorkerNode(workerNode
)
1323 this.afterWorkerNodeSetup(workerNodeKey
)
1324 return workerNodeKey
1328 * Creates a new, completely set up dynamic worker node.
1330 * @returns New, completely set up dynamic worker node key.
1332 protected createAndSetupDynamicWorkerNode (): number {
1333 const workerNodeKey
= this.createAndSetupWorkerNode()
1334 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1335 this.checkMessageWorkerId(message
)
1336 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1339 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1340 // Kill message received from worker
1342 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1343 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1344 ((this.opts
.enableTasksQueue
=== false &&
1345 workerUsage
.tasks
.executing
=== 0) ||
1346 (this.opts
.enableTasksQueue
=== true &&
1347 workerUsage
.tasks
.executing
=== 0 &&
1348 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1350 // Flag the worker node as not ready immediately
1351 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1352 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1353 this.emitter
?.emit(PoolEvents
.error
, error
)
1357 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1358 this.sendToWorker(workerNodeKey
, {
1361 if (this.taskFunctions
.size
> 0) {
1362 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1363 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1364 taskFunctionOperation
: 'add',
1366 taskFunction
: taskFunction
.toString()
1368 this.emitter
?.emit(PoolEvents
.error
, error
)
1372 workerInfo
.dynamic
= true
1374 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1375 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1377 workerInfo
.ready
= true
1379 this.checkAndEmitDynamicWorkerCreationEvents()
1380 return workerNodeKey
1384 * Registers a listener callback on the worker given its worker node key.
1386 * @param workerNodeKey - The worker node key.
1387 * @param listener - The message listener callback.
1389 protected abstract registerWorkerMessageListener
<
1390 Message
extends Data
| Response
1392 workerNodeKey
: number,
1393 listener
: (message
: MessageValue
<Message
>) => void
1397 * Registers once a listener callback on the worker given its worker node key.
1399 * @param workerNodeKey - The worker node key.
1400 * @param listener - The message listener callback.
1402 protected abstract registerOnceWorkerMessageListener
<
1403 Message
extends Data
| Response
1405 workerNodeKey
: number,
1406 listener
: (message
: MessageValue
<Message
>) => void
1410 * Deregisters a listener callback on the worker given its worker node key.
1412 * @param workerNodeKey - The worker node key.
1413 * @param listener - The message listener callback.
1415 protected abstract deregisterWorkerMessageListener
<
1416 Message
extends Data
| Response
1418 workerNodeKey
: number,
1419 listener
: (message
: MessageValue
<Message
>) => void
1423 * Method hooked up after a worker node has been newly created.
1424 * Can be overridden.
1426 * @param workerNodeKey - The newly created worker node key.
1428 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1429 // Listen to worker messages.
1430 this.registerWorkerMessageListener(
1432 this.workerMessageListener
1434 // Send the startup message to worker.
1435 this.sendStartupMessageToWorker(workerNodeKey
)
1436 // Send the statistics message to worker.
1437 this.sendStatisticsMessageToWorker(workerNodeKey
)
1438 if (this.opts
.enableTasksQueue
=== true) {
1439 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1440 this.workerNodes
[workerNodeKey
].on(
1442 this.handleIdleWorkerNodeEvent
1445 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1446 this.workerNodes
[workerNodeKey
].on(
1448 this.handleBackPressureEvent
1455 * Sends the startup message to worker given its worker node key.
1457 * @param workerNodeKey - The worker node key.
1459 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1462 * Sends the statistics message to worker given its worker node key.
1464 * @param workerNodeKey - The worker node key.
1466 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1467 this.sendToWorker(workerNodeKey
, {
1470 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1472 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1478 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1479 if (this.shallExecuteTask(workerNodeKey
)) {
1480 this.executeTask(workerNodeKey
, task
)
1482 this.enqueueTask(workerNodeKey
, task
)
1486 private redistributeQueuedTasks (workerNodeKey
: number): void {
1487 if (this.workerNodes
.length
<= 1) {
1490 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1491 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1492 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1493 return workerNode
.info
.ready
&&
1494 workerNode
.usage
.tasks
.queued
<
1495 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1502 destinationWorkerNodeKey
,
1503 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1508 private updateTaskStolenStatisticsWorkerUsage (
1509 workerNodeKey
: number,
1512 const workerNode
= this.workerNodes
[workerNodeKey
]
1513 if (workerNode
?.usage
!= null) {
1514 ++workerNode
.usage
.tasks
.stolen
1517 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1518 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1520 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1523 ++taskFunctionWorkerUsage
.tasks
.stolen
1527 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1528 workerNodeKey
: number
1530 const workerNode
= this.workerNodes
[workerNodeKey
]
1531 if (workerNode
?.usage
!= null) {
1532 ++workerNode
.usage
.tasks
.sequentiallyStolen
1536 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1537 workerNodeKey
: number,
1540 const workerNode
= this.workerNodes
[workerNodeKey
]
1542 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1543 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1545 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1548 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1552 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1553 workerNodeKey
: number
1555 const workerNode
= this.workerNodes
[workerNodeKey
]
1556 if (workerNode
?.usage
!= null) {
1557 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1561 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1562 workerNodeKey
: number,
1565 const workerNode
= this.workerNodes
[workerNodeKey
]
1567 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1568 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1570 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1573 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1577 private readonly handleIdleWorkerNodeEvent
= (
1578 eventDetail
: WorkerNodeEventDetail
,
1579 previousStolenTask
?: Task
<Data
>
1581 if (this.workerNodes
.length
<= 1) {
1584 const { workerNodeKey
} = eventDetail
1585 if (workerNodeKey
== null) {
1587 'WorkerNode event detail workerNodeKey attribute must be defined'
1590 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1592 previousStolenTask
!= null &&
1593 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1594 (workerNodeTasksUsage
.executing
> 0 ||
1595 this.tasksQueueSize(workerNodeKey
) > 0)
1597 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1598 .taskFunctionNames
as string[]) {
1599 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1604 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1607 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1609 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1612 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1614 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1615 ?.tasks
as TaskStatistics
1617 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1618 (previousStolenTask
!= null &&
1619 previousStolenTask
.name
=== stolenTask
.name
&&
1620 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1622 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1624 stolenTask
.name
as string
1627 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1629 stolenTask
.name
as string
1633 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1635 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1638 .catch(EMPTY_FUNCTION
)
1641 private readonly workerNodeStealTask
= (
1642 workerNodeKey
: number
1643 ): Task
<Data
> | undefined => {
1644 const workerNodes
= this.workerNodes
1647 (workerNodeA
, workerNodeB
) =>
1648 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1650 const sourceWorkerNode
= workerNodes
.find(
1651 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1652 sourceWorkerNode
.info
.ready
&&
1653 sourceWorkerNodeKey
!== workerNodeKey
&&
1654 sourceWorkerNode
.usage
.tasks
.queued
> 0
1656 if (sourceWorkerNode
!= null) {
1657 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1658 this.handleTask(workerNodeKey
, task
)
1659 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1660 this.updateTaskStolenStatisticsWorkerUsage(
1668 private readonly handleBackPressureEvent
= (
1669 eventDetail
: WorkerNodeEventDetail
1671 if (this.workerNodes
.length
<= 1) {
1674 const { workerId
} = eventDetail
1675 const sizeOffset
= 1
1676 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1679 const sourceWorkerNode
=
1680 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1681 const workerNodes
= this.workerNodes
1684 (workerNodeA
, workerNodeB
) =>
1685 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1687 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1689 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1690 workerNode
.info
.ready
&&
1691 workerNode
.info
.id
!== workerId
&&
1692 workerNode
.usage
.tasks
.queued
<
1693 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1695 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1696 this.handleTask(workerNodeKey
, task
)
1697 this.updateTaskStolenStatisticsWorkerUsage(
1706 * This method is the message listener registered on each worker.
1708 protected readonly workerMessageListener
= (
1709 message
: MessageValue
<Response
>
1711 this.checkMessageWorkerId(message
)
1712 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1713 if (ready
!= null && taskFunctionNames
!= null) {
1714 // Worker ready response received from worker
1715 this.handleWorkerReadyResponse(message
)
1716 } else if (taskId
!= null) {
1717 // Task execution response received from worker
1718 this.handleTaskExecutionResponse(message
)
1719 } else if (taskFunctionNames
!= null) {
1720 // Task function names message received from worker
1722 this.getWorkerNodeKeyByWorkerId(workerId
)
1723 ).taskFunctionNames
= taskFunctionNames
1727 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1728 const { workerId
, ready
, taskFunctionNames
} = message
1729 if (ready
=== false) {
1730 throw new Error(`Worker ${workerId as number} failed to initialize`)
1732 const workerInfo
= this.getWorkerInfo(
1733 this.getWorkerNodeKeyByWorkerId(workerId
)
1735 workerInfo
.ready
= ready
as boolean
1736 workerInfo
.taskFunctionNames
= taskFunctionNames
1737 if (!this.readyEventEmitted
&& this.ready
) {
1738 this.readyEventEmitted
= true
1739 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1743 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1744 const { workerId
, taskId
, workerError
, data
} = message
1745 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1746 if (promiseResponse
!= null) {
1747 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1748 if (workerError
!= null) {
1749 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1750 asyncResource
!= null
1751 ? asyncResource
.runInAsyncScope(
1756 : reject(workerError
.message
)
1758 asyncResource
!= null
1759 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1760 : resolve(data
as Response
)
1762 asyncResource
?.emitDestroy()
1763 this.afterTaskExecutionHook(workerNodeKey
, message
)
1764 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1765 this.promiseResponseMap
.delete(taskId
as string)
1766 this.workerNodes
[workerNodeKey
].emit('taskFinished', taskId
)
1767 if (this.opts
.enableTasksQueue
=== true) {
1768 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1770 this.tasksQueueSize(workerNodeKey
) > 0 &&
1771 workerNodeTasksUsage
.executing
<
1772 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1776 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1780 workerNodeTasksUsage
.executing
=== 0 &&
1781 this.tasksQueueSize(workerNodeKey
) === 0 &&
1782 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1784 this.workerNodes
[workerNodeKey
].emit('idleWorkerNode', {
1785 workerId
: workerId
as number,
1793 private checkAndEmitTaskExecutionEvents (): void {
1795 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1799 private checkAndEmitTaskQueuingEvents (): void {
1800 if (this.hasBackPressure()) {
1801 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1805 private checkAndEmitDynamicWorkerCreationEvents (): void {
1806 if (this.type === PoolTypes
.dynamic
) {
1808 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1814 * Gets the worker information given its worker node key.
1816 * @param workerNodeKey - The worker node key.
1817 * @returns The worker information.
1819 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1820 return this.workerNodes
[workerNodeKey
]?.info
1824 * Creates a worker node.
1826 * @returns The created worker node.
1828 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1829 const workerNode
= new WorkerNode
<Worker
, Data
>(
1834 workerOptions
: this.opts
.workerOptions
,
1835 tasksQueueBackPressureSize
:
1836 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1839 // Flag the worker node as ready at pool startup.
1840 if (this.starting
) {
1841 workerNode
.info
.ready
= true
1847 * Adds the given worker node in the pool worker nodes.
1849 * @param workerNode - The worker node.
1850 * @returns The added worker node key.
1851 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1853 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1854 this.workerNodes
.push(workerNode
)
1855 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1856 if (workerNodeKey
=== -1) {
1857 throw new Error('Worker added not found in worker nodes')
1859 return workerNodeKey
1863 * Removes the worker node from the pool worker nodes.
1865 * @param workerNode - The worker node.
1867 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1868 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1869 if (workerNodeKey
!== -1) {
1870 this.workerNodes
.splice(workerNodeKey
, 1)
1871 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1875 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1876 this.getWorkerInfo(workerNodeKey
).ready
= false
1880 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1882 this.opts
.enableTasksQueue
=== true &&
1883 this.workerNodes
[workerNodeKey
].hasBackPressure()
1887 private hasBackPressure (): boolean {
1889 this.opts
.enableTasksQueue
=== true &&
1890 this.workerNodes
.findIndex(
1891 workerNode
=> !workerNode
.hasBackPressure()
1897 * Executes the given task on the worker given its worker node key.
1899 * @param workerNodeKey - The worker node key.
1900 * @param task - The task to execute.
1902 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1903 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1904 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1905 this.checkAndEmitTaskExecutionEvents()
1908 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1909 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1910 this.checkAndEmitTaskQueuingEvents()
1911 return tasksQueueSize
1914 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1915 return this.workerNodes
[workerNodeKey
].dequeueTask()
1918 private tasksQueueSize (workerNodeKey
: number): number {
1919 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1922 protected flushTasksQueue (workerNodeKey
: number): void {
1923 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1926 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1929 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1932 private flushTasksQueues (): void {
1933 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1934 this.flushTasksQueue(workerNodeKey
)