1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types'
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
25 import { KillBehaviors
} from
'../worker/worker-options'
26 import type { TaskFunction
} from
'../worker/task-functions'
34 type TasksQueueOptions
41 WorkerNodeEventDetail
,
46 type MeasurementStatisticsRequirements
,
48 WorkerChoiceStrategies
,
49 type WorkerChoiceStrategy
,
50 type WorkerChoiceStrategyOptions
51 } from
'./selection-strategies/selection-strategies-types'
52 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
53 import { version
} from
'./version'
54 import { WorkerNode
} from
'./worker-node'
57 checkValidTasksQueueOptions
,
58 checkValidWorkerChoiceStrategy
,
59 updateMeasurementStatistics
63 * Base class that implements some shared logic for all poolifier pools.
65 * @typeParam Worker - Type of worker which manages this pool.
66 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
67 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
69 export abstract class AbstractPool
<
70 Worker
extends IWorker
,
73 > implements IPool
<Worker
, Data
, Response
> {
75 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
78 public emitter
?: EventEmitterAsyncResource
81 * Dynamic pool maximum size property placeholder.
83 protected readonly max
?: number
86 * The task execution response promise map:
87 * - `key`: The message id of each submitted task.
88 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
90 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
92 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
93 new Map
<string, PromiseResponseWrapper
<Response
>>()
96 * Worker choice strategy context referencing a worker choice algorithm implementation.
98 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
105 * The task functions added at runtime map:
106 * - `key`: The task function name.
107 * - `value`: The task function itself.
109 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
112 * Whether the pool is started or not.
114 private started
: boolean
116 * Whether the pool is starting or not.
118 private starting
: boolean
120 * Whether the pool is destroying or not.
122 private destroying
: boolean
124 * Whether the pool ready event has been emitted or not.
126 private readyEventEmitted
: boolean
128 * The start timestamp of the pool.
130 private readonly startTimestamp
133 * Constructs a new poolifier pool.
135 * @param numberOfWorkers - Number of workers that this pool should manage.
136 * @param filePath - Path to the worker file.
137 * @param opts - Options for the pool.
140 protected readonly numberOfWorkers
: number,
141 protected readonly filePath
: string,
142 protected readonly opts
: PoolOptions
<Worker
>
144 if (!this.isMain()) {
146 'Cannot start a pool from a worker with the same type as the pool'
149 checkFilePath(this.filePath
)
150 this.checkNumberOfWorkers(this.numberOfWorkers
)
151 this.checkPoolOptions(this.opts
)
153 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
154 this.executeTask
= this.executeTask
.bind(this)
155 this.enqueueTask
= this.enqueueTask
.bind(this)
157 if (this.opts
.enableEvents
=== true) {
158 this.initializeEventEmitter()
160 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
166 this.opts
.workerChoiceStrategy
,
167 this.opts
.workerChoiceStrategyOptions
172 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
175 this.starting
= false
176 this.destroying
= false
177 this.readyEventEmitted
= false
178 if (this.opts
.startWorkers
=== true) {
182 this.startTimestamp
= performance
.now()
185 private checkNumberOfWorkers (numberOfWorkers
: number): void {
186 if (numberOfWorkers
== null) {
188 'Cannot instantiate a pool without specifying the number of workers'
190 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
192 'Cannot instantiate a pool with a non safe integer number of workers'
194 } else if (numberOfWorkers
< 0) {
195 throw new RangeError(
196 'Cannot instantiate a pool with a negative number of workers'
198 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
199 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
203 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
204 if (isPlainObject(opts
)) {
205 this.opts
.startWorkers
= opts
.startWorkers
?? true
206 checkValidWorkerChoiceStrategy(
207 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
209 this.opts
.workerChoiceStrategy
=
210 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
211 this.checkValidWorkerChoiceStrategyOptions(
212 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
214 this.opts
.workerChoiceStrategyOptions
= {
215 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
216 ...opts
.workerChoiceStrategyOptions
218 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
219 this.opts
.enableEvents
= opts
.enableEvents
?? true
220 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
221 if (this.opts
.enableTasksQueue
) {
222 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
223 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
224 opts
.tasksQueueOptions
as TasksQueueOptions
228 throw new TypeError('Invalid pool options: must be a plain object')
232 private checkValidWorkerChoiceStrategyOptions (
233 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
236 workerChoiceStrategyOptions
!= null &&
237 !isPlainObject(workerChoiceStrategyOptions
)
240 'Invalid worker choice strategy options: must be a plain object'
244 workerChoiceStrategyOptions
?.retries
!= null &&
245 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
248 'Invalid worker choice strategy options: retries must be an integer'
252 workerChoiceStrategyOptions
?.retries
!= null &&
253 workerChoiceStrategyOptions
.retries
< 0
255 throw new RangeError(
256 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
260 workerChoiceStrategyOptions
?.weights
!= null &&
261 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
264 'Invalid worker choice strategy options: must have a weight for each worker node'
268 workerChoiceStrategyOptions
?.measurement
!= null &&
269 !Object.values(Measurements
).includes(
270 workerChoiceStrategyOptions
.measurement
274 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
279 private initializeEventEmitter (): void {
280 this.emitter
= new EventEmitterAsyncResource({
281 name
: `poolifier:${this.type}-${this.worker}-pool`
286 public get
info (): PoolInfo
{
291 started
: this.started
,
293 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
294 minSize
: this.minSize
,
295 maxSize
: this.maxSize
,
296 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
297 .runTime
.aggregate
&&
298 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
299 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
300 workerNodes
: this.workerNodes
.length
,
301 idleWorkerNodes
: this.workerNodes
.reduce(
302 (accumulator
, workerNode
) =>
303 workerNode
.usage
.tasks
.executing
=== 0
308 busyWorkerNodes
: this.workerNodes
.reduce(
309 (accumulator
, _workerNode
, workerNodeKey
) =>
310 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
313 executedTasks
: this.workerNodes
.reduce(
314 (accumulator
, workerNode
) =>
315 accumulator
+ workerNode
.usage
.tasks
.executed
,
318 executingTasks
: this.workerNodes
.reduce(
319 (accumulator
, workerNode
) =>
320 accumulator
+ workerNode
.usage
.tasks
.executing
,
323 ...(this.opts
.enableTasksQueue
=== true && {
324 queuedTasks
: this.workerNodes
.reduce(
325 (accumulator
, workerNode
) =>
326 accumulator
+ workerNode
.usage
.tasks
.queued
,
330 ...(this.opts
.enableTasksQueue
=== true && {
331 maxQueuedTasks
: this.workerNodes
.reduce(
332 (accumulator
, workerNode
) =>
333 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
337 ...(this.opts
.enableTasksQueue
=== true && {
338 backPressure
: this.hasBackPressure()
340 ...(this.opts
.enableTasksQueue
=== true && {
341 stolenTasks
: this.workerNodes
.reduce(
342 (accumulator
, workerNode
) =>
343 accumulator
+ workerNode
.usage
.tasks
.stolen
,
347 failedTasks
: this.workerNodes
.reduce(
348 (accumulator
, workerNode
) =>
349 accumulator
+ workerNode
.usage
.tasks
.failed
,
352 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
353 .runTime
.aggregate
&& {
357 ...this.workerNodes
.map(
358 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
364 ...this.workerNodes
.map(
365 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
369 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
370 .runTime
.average
&& {
373 this.workerNodes
.reduce
<number[]>(
374 (accumulator
, workerNode
) =>
375 accumulator
.concat(workerNode
.usage
.runTime
.history
),
381 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
385 this.workerNodes
.reduce
<number[]>(
386 (accumulator
, workerNode
) =>
387 accumulator
.concat(workerNode
.usage
.runTime
.history
),
395 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
396 .waitTime
.aggregate
&& {
400 ...this.workerNodes
.map(
401 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
407 ...this.workerNodes
.map(
408 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
412 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
413 .waitTime
.average
&& {
416 this.workerNodes
.reduce
<number[]>(
417 (accumulator
, workerNode
) =>
418 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
424 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
425 .waitTime
.median
&& {
428 this.workerNodes
.reduce
<number[]>(
429 (accumulator
, workerNode
) =>
430 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
442 * The pool readiness boolean status.
444 private get
ready (): boolean {
446 this.workerNodes
.reduce(
447 (accumulator
, workerNode
) =>
448 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
457 * The approximate pool utilization.
459 * @returns The pool utilization.
461 private get
utilization (): number {
462 const poolTimeCapacity
=
463 (performance
.now() - this.startTimestamp
) * this.maxSize
464 const totalTasksRunTime
= this.workerNodes
.reduce(
465 (accumulator
, workerNode
) =>
466 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
469 const totalTasksWaitTime
= this.workerNodes
.reduce(
470 (accumulator
, workerNode
) =>
471 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
474 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
480 * If it is `'dynamic'`, it provides the `max` property.
482 protected abstract get
type (): PoolType
487 protected abstract get
worker (): WorkerType
490 * The pool minimum size.
492 protected get
minSize (): number {
493 return this.numberOfWorkers
497 * The pool maximum size.
499 protected get
maxSize (): number {
500 return this.max
?? this.numberOfWorkers
504 * Checks if the worker id sent in the received message from a worker is valid.
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
509 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
510 if (message
.workerId
== null) {
511 throw new Error('Worker message received without worker id')
512 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
514 `Worker message received from unknown worker '${message.workerId}'`
520 * Gets the given worker its worker node key.
522 * @param worker - The worker.
523 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
525 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
526 return this.workerNodes
.findIndex(
527 workerNode
=> workerNode
.worker
=== worker
532 * Gets the worker node key given its worker id.
534 * @param workerId - The worker id.
535 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
537 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
538 return this.workerNodes
.findIndex(
539 workerNode
=> workerNode
.info
.id
=== workerId
544 public setWorkerChoiceStrategy (
545 workerChoiceStrategy
: WorkerChoiceStrategy
,
546 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
548 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
549 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
550 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
551 this.opts
.workerChoiceStrategy
553 if (workerChoiceStrategyOptions
!= null) {
554 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
556 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
557 workerNode
.resetUsage()
558 this.sendStatisticsMessageToWorker(workerNodeKey
)
563 public setWorkerChoiceStrategyOptions (
564 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
566 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
567 this.opts
.workerChoiceStrategyOptions
= {
568 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
569 ...workerChoiceStrategyOptions
571 this.workerChoiceStrategyContext
.setOptions(
572 this.opts
.workerChoiceStrategyOptions
577 public enableTasksQueue (
579 tasksQueueOptions
?: TasksQueueOptions
581 if (this.opts
.enableTasksQueue
=== true && !enable
) {
582 this.unsetTaskStealing()
583 this.unsetTasksStealingOnBackPressure()
584 this.flushTasksQueues()
586 this.opts
.enableTasksQueue
= enable
587 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
591 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
592 if (this.opts
.enableTasksQueue
=== true) {
593 checkValidTasksQueueOptions(tasksQueueOptions
)
594 this.opts
.tasksQueueOptions
=
595 this.buildTasksQueueOptions(tasksQueueOptions
)
596 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
597 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
598 this.unsetTaskStealing()
599 this.setTaskStealing()
601 this.unsetTaskStealing()
603 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
604 this.unsetTasksStealingOnBackPressure()
605 this.setTasksStealingOnBackPressure()
607 this.unsetTasksStealingOnBackPressure()
609 } else if (this.opts
.tasksQueueOptions
!= null) {
610 delete this.opts
.tasksQueueOptions
614 private buildTasksQueueOptions (
615 tasksQueueOptions
: TasksQueueOptions
616 ): TasksQueueOptions
{
619 size
: Math.pow(this.maxSize
, 2),
622 tasksStealingOnBackPressure
: true
628 private setTasksQueueSize (size
: number): void {
629 for (const workerNode
of this.workerNodes
) {
630 workerNode
.tasksQueueBackPressureSize
= size
634 private setTaskStealing (): void {
635 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
636 this.workerNodes
[workerNodeKey
].on(
638 this.handleIdleWorkerNodeEvent
643 private unsetTaskStealing (): void {
644 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
645 this.workerNodes
[workerNodeKey
].off(
647 this.handleIdleWorkerNodeEvent
652 private setTasksStealingOnBackPressure (): void {
653 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
654 this.workerNodes
[workerNodeKey
].on(
656 this.handleBackPressureEvent
661 private unsetTasksStealingOnBackPressure (): void {
662 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
663 this.workerNodes
[workerNodeKey
].off(
665 this.handleBackPressureEvent
671 * Whether the pool is full or not.
673 * The pool filling boolean status.
675 protected get
full (): boolean {
676 return this.workerNodes
.length
>= this.maxSize
680 * Whether the pool is busy or not.
682 * The pool busyness boolean status.
684 protected abstract get
busy (): boolean
687 * Whether worker nodes are executing concurrently their tasks quota or not.
689 * @returns Worker nodes busyness boolean status.
691 protected internalBusy (): boolean {
692 if (this.opts
.enableTasksQueue
=== true) {
694 this.workerNodes
.findIndex(
696 workerNode
.info
.ready
&&
697 workerNode
.usage
.tasks
.executing
<
698 (this.opts
.tasksQueueOptions
?.concurrency
as number)
703 this.workerNodes
.findIndex(
705 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
710 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
711 if (this.opts
.enableTasksQueue
=== true) {
713 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
714 (this.opts
.tasksQueueOptions
?.concurrency
as number)
717 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
720 private async sendTaskFunctionOperationToWorker (
721 workerNodeKey
: number,
722 message
: MessageValue
<Data
>
723 ): Promise
<boolean> {
724 return await new Promise
<boolean>((resolve
, reject
) => {
725 const taskFunctionOperationListener
= (
726 message
: MessageValue
<Response
>
728 this.checkMessageWorkerId(message
)
729 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
731 message
.taskFunctionOperationStatus
!= null &&
732 message
.workerId
=== workerId
734 if (message
.taskFunctionOperationStatus
) {
736 } else if (!message
.taskFunctionOperationStatus
) {
739 `Task function operation '${
740 message.taskFunctionOperation as string
741 }' failed on worker ${message.workerId} with error: '${
742 message.workerError?.message as string
747 this.deregisterWorkerMessageListener(
748 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
749 taskFunctionOperationListener
753 this.registerWorkerMessageListener(
755 taskFunctionOperationListener
757 this.sendToWorker(workerNodeKey
, message
)
761 private async sendTaskFunctionOperationToWorkers (
762 message
: MessageValue
<Data
>
763 ): Promise
<boolean> {
764 return await new Promise
<boolean>((resolve
, reject
) => {
765 const responsesReceived
= new Array<MessageValue
<Response
>>()
766 const taskFunctionOperationsListener
= (
767 message
: MessageValue
<Response
>
769 this.checkMessageWorkerId(message
)
770 if (message
.taskFunctionOperationStatus
!= null) {
771 responsesReceived
.push(message
)
772 if (responsesReceived
.length
=== this.workerNodes
.length
) {
774 responsesReceived
.every(
775 message
=> message
.taskFunctionOperationStatus
=== true
780 responsesReceived
.some(
781 message
=> message
.taskFunctionOperationStatus
=== false
784 const errorResponse
= responsesReceived
.find(
785 response
=> response
.taskFunctionOperationStatus
=== false
789 `Task function operation '${
790 message.taskFunctionOperation as string
791 }' failed on worker ${
792 errorResponse?.workerId as number
794 errorResponse?.workerError?.message as string
799 this.deregisterWorkerMessageListener(
800 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
801 taskFunctionOperationsListener
806 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
807 this.registerWorkerMessageListener(
809 taskFunctionOperationsListener
811 this.sendToWorker(workerNodeKey
, message
)
817 public hasTaskFunction (name
: string): boolean {
818 for (const workerNode
of this.workerNodes
) {
820 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
821 workerNode
.info
.taskFunctionNames
.includes(name
)
830 public async addTaskFunction (
832 fn
: TaskFunction
<Data
, Response
>
833 ): Promise
<boolean> {
834 if (typeof name
!== 'string') {
835 throw new TypeError('name argument must be a string')
837 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
838 throw new TypeError('name argument must not be an empty string')
840 if (typeof fn
!== 'function') {
841 throw new TypeError('fn argument must be a function')
843 const opResult
= await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation
: 'add',
845 taskFunctionName
: name
,
846 taskFunction
: fn
.toString()
848 this.taskFunctions
.set(name
, fn
)
853 public async removeTaskFunction (name
: string): Promise
<boolean> {
854 if (!this.taskFunctions
.has(name
)) {
856 'Cannot remove a task function not handled on the pool side'
859 const opResult
= await this.sendTaskFunctionOperationToWorkers({
860 taskFunctionOperation
: 'remove',
861 taskFunctionName
: name
863 this.deleteTaskFunctionWorkerUsages(name
)
864 this.taskFunctions
.delete(name
)
869 public listTaskFunctionNames (): string[] {
870 for (const workerNode
of this.workerNodes
) {
872 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
873 workerNode
.info
.taskFunctionNames
.length
> 0
875 return workerNode
.info
.taskFunctionNames
882 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
883 return await this.sendTaskFunctionOperationToWorkers({
884 taskFunctionOperation
: 'default',
885 taskFunctionName
: name
889 private deleteTaskFunctionWorkerUsages (name
: string): void {
890 for (const workerNode
of this.workerNodes
) {
891 workerNode
.deleteTaskFunctionWorkerUsage(name
)
895 private shallExecuteTask (workerNodeKey
: number): boolean {
897 this.tasksQueueSize(workerNodeKey
) === 0 &&
898 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
899 (this.opts
.tasksQueueOptions
?.concurrency
as number)
904 public async execute (
907 transferList
?: TransferListItem
[]
908 ): Promise
<Response
> {
909 return await new Promise
<Response
>((resolve
, reject
) => {
911 reject(new Error('Cannot execute a task on not started pool'))
914 if (this.destroying
) {
915 reject(new Error('Cannot execute a task on destroying pool'))
918 if (name
!= null && typeof name
!== 'string') {
919 reject(new TypeError('name argument must be a string'))
924 typeof name
=== 'string' &&
925 name
.trim().length
=== 0
927 reject(new TypeError('name argument must not be an empty string'))
930 if (transferList
!= null && !Array.isArray(transferList
)) {
931 reject(new TypeError('transferList argument must be an array'))
934 const timestamp
= performance
.now()
935 const workerNodeKey
= this.chooseWorkerNode()
936 const task
: Task
<Data
> = {
937 name
: name
?? DEFAULT_TASK_NAME
,
938 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
939 data
: data
?? ({} as Data
),
944 this.promiseResponseMap
.set(task
.taskId
as string, {
948 ...(this.emitter
!= null && {
949 asyncResource
: new AsyncResource('poolifier:task', {
950 triggerAsyncId
: this.emitter
.asyncId
,
951 requireManualDestroy
: true
956 this.opts
.enableTasksQueue
=== false ||
957 (this.opts
.enableTasksQueue
=== true &&
958 this.shallExecuteTask(workerNodeKey
))
960 this.executeTask(workerNodeKey
, task
)
962 this.enqueueTask(workerNodeKey
, task
)
968 public start (): void {
970 throw new Error('Cannot start an already started pool')
973 throw new Error('Cannot start an already starting pool')
975 if (this.destroying
) {
976 throw new Error('Cannot start a destroying pool')
980 this.workerNodes
.reduce(
981 (accumulator
, workerNode
) =>
982 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
984 ) < this.numberOfWorkers
986 this.createAndSetupWorkerNode()
988 this.starting
= false
993 public async destroy (): Promise
<void> {
995 throw new Error('Cannot destroy an already destroyed pool')
998 throw new Error('Cannot destroy an starting pool')
1000 if (this.destroying
) {
1001 throw new Error('Cannot destroy an already destroying pool')
1003 this.destroying
= true
1005 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1006 await this.destroyWorkerNode(workerNodeKey
)
1009 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1010 this.emitter
?.emitDestroy()
1011 this.emitter
?.removeAllListeners()
1012 this.readyEventEmitted
= false
1013 this.destroying
= false
1014 this.started
= false
1017 protected async sendKillMessageToWorker (
1018 workerNodeKey
: number
1020 await new Promise
<void>((resolve
, reject
) => {
1021 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1022 this.checkMessageWorkerId(message
)
1023 if (message
.kill
=== 'success') {
1025 } else if (message
.kill
=== 'failure') {
1028 `Kill message handling failed on worker ${
1029 message.workerId as number
1035 // FIXME: should be registered only once
1036 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1037 this.sendToWorker(workerNodeKey
, { kill
: true })
1042 * Terminates the worker node given its worker node key.
1044 * @param workerNodeKey - The worker node key.
1046 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1047 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1048 this.flushTasksQueue(workerNodeKey
)
1049 // FIXME: wait for tasks to be finished
1050 const workerNode
= this.workerNodes
[workerNodeKey
]
1051 await this.sendKillMessageToWorker(workerNodeKey
)
1052 await workerNode
.terminate()
1056 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1057 * Can be overridden.
1061 protected setupHook (): void {
1062 /* Intentionally empty */
1066 * Should return whether the worker is the main worker or not.
1068 protected abstract isMain (): boolean
1071 * Hook executed before the worker task execution.
1072 * Can be overridden.
1074 * @param workerNodeKey - The worker node key.
1075 * @param task - The task to execute.
1077 protected beforeTaskExecutionHook (
1078 workerNodeKey
: number,
1081 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1082 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1083 ++workerUsage
.tasks
.executing
1084 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1087 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1088 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1092 const taskFunctionWorkerUsage
= this.workerNodes
[
1094 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1095 ++taskFunctionWorkerUsage
.tasks
.executing
1096 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1101 * Hook executed after the worker task execution.
1102 * Can be overridden.
1104 * @param workerNodeKey - The worker node key.
1105 * @param message - The received message.
1107 protected afterTaskExecutionHook (
1108 workerNodeKey
: number,
1109 message
: MessageValue
<Response
>
1111 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1112 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1113 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1114 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1115 this.updateEluWorkerUsage(workerUsage
, message
)
1118 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1119 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1120 message
.taskPerformance
?.name
as string
1123 const taskFunctionWorkerUsage
= this.workerNodes
[
1125 ].getTaskFunctionWorkerUsage(
1126 message
.taskPerformance
?.name
as string
1128 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1129 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1130 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1135 * Whether the worker node shall update its task function worker usage or not.
1137 * @param workerNodeKey - The worker node key.
1138 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1140 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1141 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1143 workerInfo
!= null &&
1144 Array.isArray(workerInfo
.taskFunctionNames
) &&
1145 workerInfo
.taskFunctionNames
.length
> 2
1149 private updateTaskStatisticsWorkerUsage (
1150 workerUsage
: WorkerUsage
,
1151 message
: MessageValue
<Response
>
1153 const workerTaskStatistics
= workerUsage
.tasks
1155 workerTaskStatistics
.executing
!= null &&
1156 workerTaskStatistics
.executing
> 0
1158 --workerTaskStatistics
.executing
1160 if (message
.workerError
== null) {
1161 ++workerTaskStatistics
.executed
1163 ++workerTaskStatistics
.failed
1167 private updateRunTimeWorkerUsage (
1168 workerUsage
: WorkerUsage
,
1169 message
: MessageValue
<Response
>
1171 if (message
.workerError
!= null) {
1174 updateMeasurementStatistics(
1175 workerUsage
.runTime
,
1176 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1177 message
.taskPerformance
?.runTime
?? 0
1181 private updateWaitTimeWorkerUsage (
1182 workerUsage
: WorkerUsage
,
1185 const timestamp
= performance
.now()
1186 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1187 updateMeasurementStatistics(
1188 workerUsage
.waitTime
,
1189 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1194 private updateEluWorkerUsage (
1195 workerUsage
: WorkerUsage
,
1196 message
: MessageValue
<Response
>
1198 if (message
.workerError
!= null) {
1201 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1202 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1203 updateMeasurementStatistics(
1204 workerUsage
.elu
.active
,
1205 eluTaskStatisticsRequirements
,
1206 message
.taskPerformance
?.elu
?.active
?? 0
1208 updateMeasurementStatistics(
1209 workerUsage
.elu
.idle
,
1210 eluTaskStatisticsRequirements
,
1211 message
.taskPerformance
?.elu
?.idle
?? 0
1213 if (eluTaskStatisticsRequirements
.aggregate
) {
1214 if (message
.taskPerformance
?.elu
!= null) {
1215 if (workerUsage
.elu
.utilization
!= null) {
1216 workerUsage
.elu
.utilization
=
1217 (workerUsage
.elu
.utilization
+
1218 message
.taskPerformance
.elu
.utilization
) /
1221 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1228 * Chooses a worker node for the next task.
1230 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1232 * @returns The chosen worker node key
1234 private chooseWorkerNode (): number {
1235 if (this.shallCreateDynamicWorker()) {
1236 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1238 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1240 return workerNodeKey
1243 return this.workerChoiceStrategyContext
.execute()
1247 * Conditions for dynamic worker creation.
1249 * @returns Whether to create a dynamic worker or not.
1251 private shallCreateDynamicWorker (): boolean {
1252 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1256 * Sends a message to worker given its worker node key.
1258 * @param workerNodeKey - The worker node key.
1259 * @param message - The message.
1260 * @param transferList - The optional array of transferable objects.
1262 protected abstract sendToWorker (
1263 workerNodeKey
: number,
1264 message
: MessageValue
<Data
>,
1265 transferList
?: TransferListItem
[]
1269 * Creates a new, completely set up worker node.
1271 * @returns New, completely set up worker node key.
1273 protected createAndSetupWorkerNode (): number {
1274 const workerNode
= this.createWorkerNode()
1275 workerNode
.registerWorkerEventHandler(
1277 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1279 workerNode
.registerWorkerEventHandler(
1281 this.opts
.messageHandler
?? EMPTY_FUNCTION
1283 workerNode
.registerWorkerEventHandler(
1285 this.opts
.errorHandler
?? EMPTY_FUNCTION
1287 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1288 const workerNodeKey
= this.getWorkerNodeKeyByWorker(workerNode
.worker
)
1289 workerNode
.info
.ready
= false
1290 this.emitter
?.emit(PoolEvents
.error
, error
)
1295 this.opts
.restartWorkerOnError
=== true
1297 if (workerNode
.info
.dynamic
) {
1298 this.createAndSetupDynamicWorkerNode()
1300 this.createAndSetupWorkerNode()
1303 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1304 this.redistributeQueuedTasks(workerNodeKey
)
1306 workerNode
.terminate().catch(error
=> {
1307 this.emitter
?.emit(PoolEvents
.error
, error
)
1310 workerNode
.registerWorkerEventHandler(
1312 this.opts
.exitHandler
?? EMPTY_FUNCTION
1314 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1315 this.removeWorkerNode(workerNode
.worker
)
1317 const workerNodeKey
= this.addWorkerNode(workerNode
)
1318 this.afterWorkerNodeSetup(workerNodeKey
)
1319 return workerNodeKey
1323 * Creates a new, completely set up dynamic worker node.
1325 * @returns New, completely set up dynamic worker node key.
1327 protected createAndSetupDynamicWorkerNode (): number {
1328 const workerNodeKey
= this.createAndSetupWorkerNode()
1329 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1330 this.checkMessageWorkerId(message
)
1331 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1334 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1335 // Kill message received from worker
1337 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1338 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1339 ((this.opts
.enableTasksQueue
=== false &&
1340 workerUsage
.tasks
.executing
=== 0) ||
1341 (this.opts
.enableTasksQueue
=== true &&
1342 workerUsage
.tasks
.executing
=== 0 &&
1343 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1345 // Flag the worker node as not ready immediately
1346 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1347 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1348 this.emitter
?.emit(PoolEvents
.error
, error
)
1352 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1353 this.sendToWorker(workerNodeKey
, {
1356 if (this.taskFunctions
.size
> 0) {
1357 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1358 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1359 taskFunctionOperation
: 'add',
1361 taskFunction
: taskFunction
.toString()
1363 this.emitter
?.emit(PoolEvents
.error
, error
)
1367 workerInfo
.dynamic
= true
1369 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1370 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1372 workerInfo
.ready
= true
1374 this.checkAndEmitDynamicWorkerCreationEvents()
1375 return workerNodeKey
1379 * Registers a listener callback on the worker given its worker node key.
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1384 protected abstract registerWorkerMessageListener
<
1385 Message
extends Data
| Response
1387 workerNodeKey
: number,
1388 listener
: (message
: MessageValue
<Message
>) => void
1392 * Registers once a listener callback on the worker given its worker node key.
1394 * @param workerNodeKey - The worker node key.
1395 * @param listener - The message listener callback.
1397 protected abstract registerOnceWorkerMessageListener
<
1398 Message
extends Data
| Response
1400 workerNodeKey
: number,
1401 listener
: (message
: MessageValue
<Message
>) => void
1405 * Deregisters a listener callback on the worker given its worker node key.
1407 * @param workerNodeKey - The worker node key.
1408 * @param listener - The message listener callback.
1410 protected abstract deregisterWorkerMessageListener
<
1411 Message
extends Data
| Response
1413 workerNodeKey
: number,
1414 listener
: (message
: MessageValue
<Message
>) => void
1418 * Method hooked up after a worker node has been newly created.
1419 * Can be overridden.
1421 * @param workerNodeKey - The newly created worker node key.
1423 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1424 // Listen to worker messages.
1425 this.registerWorkerMessageListener(
1427 this.workerMessageListener
1429 // Send the startup message to worker.
1430 this.sendStartupMessageToWorker(workerNodeKey
)
1431 // Send the statistics message to worker.
1432 this.sendStatisticsMessageToWorker(workerNodeKey
)
1433 if (this.opts
.enableTasksQueue
=== true) {
1434 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1435 this.workerNodes
[workerNodeKey
].on(
1437 this.handleIdleWorkerNodeEvent
1440 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1441 this.workerNodes
[workerNodeKey
].on(
1443 this.handleBackPressureEvent
1450 * Sends the startup message to worker given its worker node key.
1452 * @param workerNodeKey - The worker node key.
1454 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1457 * Sends the statistics message to worker given its worker node key.
1459 * @param workerNodeKey - The worker node key.
1461 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1462 this.sendToWorker(workerNodeKey
, {
1465 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1467 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1473 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1474 if (this.shallExecuteTask(workerNodeKey
)) {
1475 this.executeTask(workerNodeKey
, task
)
1477 this.enqueueTask(workerNodeKey
, task
)
1481 private redistributeQueuedTasks (workerNodeKey
: number): void {
1482 if (this.workerNodes
.length
<= 1) {
1485 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1486 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1487 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1488 return workerNode
.info
.ready
&&
1489 workerNode
.usage
.tasks
.queued
<
1490 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1497 destinationWorkerNodeKey
,
1498 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1503 private updateTaskStolenStatisticsWorkerUsage (
1504 workerNodeKey
: number,
1507 const workerNode
= this.workerNodes
[workerNodeKey
]
1508 if (workerNode
?.usage
!= null) {
1509 ++workerNode
.usage
.tasks
.stolen
1512 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1513 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1515 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1518 ++taskFunctionWorkerUsage
.tasks
.stolen
1522 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1523 workerNodeKey
: number
1525 const workerNode
= this.workerNodes
[workerNodeKey
]
1526 if (workerNode
?.usage
!= null) {
1527 ++workerNode
.usage
.tasks
.sequentiallyStolen
1531 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1532 workerNodeKey
: number,
1535 const workerNode
= this.workerNodes
[workerNodeKey
]
1537 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1538 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1540 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1543 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1547 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1548 workerNodeKey
: number
1550 const workerNode
= this.workerNodes
[workerNodeKey
]
1551 if (workerNode
?.usage
!= null) {
1552 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1556 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1557 workerNodeKey
: number,
1560 const workerNode
= this.workerNodes
[workerNodeKey
]
1562 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1563 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1565 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1568 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1572 private readonly handleIdleWorkerNodeEvent
= (
1573 eventDetail
: WorkerNodeEventDetail
,
1574 previousStolenTask
?: Task
<Data
>
1576 if (this.workerNodes
.length
<= 1) {
1579 const { workerNodeKey
} = eventDetail
1580 if (workerNodeKey
== null) {
1582 'WorkerNode event detail workerNodeKey attribute must be defined'
1585 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1587 previousStolenTask
!= null &&
1588 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1589 (workerNodeTasksUsage
.executing
> 0 ||
1590 this.tasksQueueSize(workerNodeKey
) > 0)
1592 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1593 .taskFunctionNames
as string[]) {
1594 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1599 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1602 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1604 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1607 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1609 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1610 ?.tasks
as TaskStatistics
1612 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1613 (previousStolenTask
!= null &&
1614 previousStolenTask
.name
=== stolenTask
.name
&&
1615 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1617 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1619 stolenTask
.name
as string
1622 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1624 stolenTask
.name
as string
1628 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1630 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1633 .catch(EMPTY_FUNCTION
)
1636 private readonly workerNodeStealTask
= (
1637 workerNodeKey
: number
1638 ): Task
<Data
> | undefined => {
1639 const workerNodes
= this.workerNodes
1642 (workerNodeA
, workerNodeB
) =>
1643 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1645 const sourceWorkerNode
= workerNodes
.find(
1646 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1647 sourceWorkerNode
.info
.ready
&&
1648 sourceWorkerNodeKey
!== workerNodeKey
&&
1649 sourceWorkerNode
.usage
.tasks
.queued
> 0
1651 if (sourceWorkerNode
!= null) {
1652 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1653 this.handleTask(workerNodeKey
, task
)
1654 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1655 this.updateTaskStolenStatisticsWorkerUsage(
1663 private readonly handleBackPressureEvent
= (
1664 eventDetail
: WorkerNodeEventDetail
1666 if (this.workerNodes
.length
<= 1) {
1669 const { workerId
} = eventDetail
1670 const sizeOffset
= 1
1671 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1674 const sourceWorkerNode
=
1675 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1676 const workerNodes
= this.workerNodes
1679 (workerNodeA
, workerNodeB
) =>
1680 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1682 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1684 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1685 workerNode
.info
.ready
&&
1686 workerNode
.info
.id
!== workerId
&&
1687 workerNode
.usage
.tasks
.queued
<
1688 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1690 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1691 this.handleTask(workerNodeKey
, task
)
1692 this.updateTaskStolenStatisticsWorkerUsage(
1701 * This method is the message listener registered on each worker.
1703 protected readonly workerMessageListener
= (
1704 message
: MessageValue
<Response
>
1706 this.checkMessageWorkerId(message
)
1707 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1708 if (ready
!= null && taskFunctionNames
!= null) {
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message
)
1711 } else if (taskId
!= null) {
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message
)
1714 } else if (taskFunctionNames
!= null) {
1715 // Task function names message received from worker
1717 this.getWorkerNodeKeyByWorkerId(workerId
)
1718 ).taskFunctionNames
= taskFunctionNames
1722 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1723 const { workerId
, ready
, taskFunctionNames
} = message
1724 if (ready
=== false) {
1725 throw new Error(`Worker ${workerId as number} failed to initialize`)
1727 const workerInfo
= this.getWorkerInfo(
1728 this.getWorkerNodeKeyByWorkerId(workerId
)
1730 workerInfo
.ready
= ready
as boolean
1731 workerInfo
.taskFunctionNames
= taskFunctionNames
1732 if (!this.readyEventEmitted
&& this.ready
) {
1733 this.readyEventEmitted
= true
1734 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1738 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1739 const { workerId
, taskId
, workerError
, data
} = message
1740 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1741 if (promiseResponse
!= null) {
1742 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1743 if (workerError
!= null) {
1744 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1745 asyncResource
!= null
1746 ? asyncResource
.runInAsyncScope(
1751 : reject(workerError
.message
)
1753 asyncResource
!= null
1754 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1755 : resolve(data
as Response
)
1757 asyncResource
?.emitDestroy()
1758 this.afterTaskExecutionHook(workerNodeKey
, message
)
1759 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1760 this.promiseResponseMap
.delete(taskId
as string)
1761 if (this.opts
.enableTasksQueue
=== true) {
1762 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1764 this.tasksQueueSize(workerNodeKey
) > 0 &&
1765 workerNodeTasksUsage
.executing
<
1766 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1770 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1774 workerNodeTasksUsage
.executing
=== 0 &&
1775 this.tasksQueueSize(workerNodeKey
) === 0 &&
1776 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1778 this.workerNodes
[workerNodeKey
].emit('idleWorkerNode', {
1779 workerId
: workerId
as number,
1787 private checkAndEmitTaskExecutionEvents (): void {
1789 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1793 private checkAndEmitTaskQueuingEvents (): void {
1794 if (this.hasBackPressure()) {
1795 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1799 private checkAndEmitDynamicWorkerCreationEvents (): void {
1800 if (this.type === PoolTypes
.dynamic
) {
1802 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1808 * Gets the worker information given its worker node key.
1810 * @param workerNodeKey - The worker node key.
1811 * @returns The worker information.
1813 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1814 return this.workerNodes
[workerNodeKey
]?.info
1818 * Creates a worker node.
1820 * @returns The created worker node.
1822 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1823 const workerNode
= new WorkerNode
<Worker
, Data
>(
1828 workerOptions
: this.opts
.workerOptions
,
1829 tasksQueueBackPressureSize
:
1830 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1833 // Flag the worker node as ready at pool startup.
1834 if (this.starting
) {
1835 workerNode
.info
.ready
= true
1841 * Adds the given worker node in the pool worker nodes.
1843 * @param workerNode - The worker node.
1844 * @returns The added worker node key.
1845 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1847 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1848 this.workerNodes
.push(workerNode
)
1849 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1850 if (workerNodeKey
=== -1) {
1851 throw new Error('Worker added not found in worker nodes')
1853 return workerNodeKey
1857 * Removes the worker node associated to the given worker from the pool worker nodes.
1859 * @param worker - The worker.
1861 private removeWorkerNode (worker
: Worker
): void {
1862 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1863 if (workerNodeKey
!== -1) {
1864 this.workerNodes
.splice(workerNodeKey
, 1)
1865 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1869 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1870 this.getWorkerInfo(workerNodeKey
).ready
= false
1874 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1876 this.opts
.enableTasksQueue
=== true &&
1877 this.workerNodes
[workerNodeKey
].hasBackPressure()
1881 private hasBackPressure (): boolean {
1883 this.opts
.enableTasksQueue
=== true &&
1884 this.workerNodes
.findIndex(
1885 workerNode
=> !workerNode
.hasBackPressure()
1891 * Executes the given task on the worker given its worker node key.
1893 * @param workerNodeKey - The worker node key.
1894 * @param task - The task to execute.
1896 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1897 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1898 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1899 this.checkAndEmitTaskExecutionEvents()
1902 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1903 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1904 this.checkAndEmitTaskQueuingEvents()
1905 return tasksQueueSize
1908 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1909 return this.workerNodes
[workerNodeKey
].dequeueTask()
1912 private tasksQueueSize (workerNodeKey
: number): number {
1913 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1916 protected flushTasksQueue (workerNodeKey
: number): void {
1917 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1920 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1923 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1926 private flushTasksQueues (): void {
1927 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1928 this.flushTasksQueue(workerNodeKey
)