1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types'
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
25 import { KillBehaviors
} from
'../worker/worker-options'
26 import type { TaskFunction
} from
'../worker/task-functions'
34 type TasksQueueOptions
41 WorkerNodeEventDetail
,
46 type MeasurementStatisticsRequirements
,
48 WorkerChoiceStrategies
,
49 type WorkerChoiceStrategy
,
50 type WorkerChoiceStrategyOptions
51 } from
'./selection-strategies/selection-strategies-types'
52 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
53 import { version
} from
'./version'
54 import { WorkerNode
} from
'./worker-node'
57 checkValidTasksQueueOptions
,
58 checkValidWorkerChoiceStrategy
,
59 updateMeasurementStatistics
,
64 * Base class that implements some shared logic for all poolifier pools.
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 export abstract class AbstractPool
<
71 Worker
extends IWorker
,
74 > implements IPool
<Worker
, Data
, Response
> {
76 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
79 public emitter
?: EventEmitterAsyncResource
82 * Dynamic pool maximum size property placeholder.
84 protected readonly max
?: number
87 * The task execution response promise map:
88 * - `key`: The message id of each submitted task.
89 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
91 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
93 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
94 new Map
<string, PromiseResponseWrapper
<Response
>>()
97 * Worker choice strategy context referencing a worker choice algorithm implementation.
99 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
106 * The task functions added at runtime map:
107 * - `key`: The task function name.
108 * - `value`: The task function itself.
110 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
113 * Whether the pool is started or not.
115 private started
: boolean
117 * Whether the pool is starting or not.
119 private starting
: boolean
121 * Whether the pool is destroying or not.
123 private destroying
: boolean
125 * Whether the pool ready event has been emitted or not.
127 private readyEventEmitted
: boolean
129 * The start timestamp of the pool.
131 private readonly startTimestamp
134 * Constructs a new poolifier pool.
136 * @param numberOfWorkers - Number of workers that this pool should manage.
137 * @param filePath - Path to the worker file.
138 * @param opts - Options for the pool.
141 protected readonly numberOfWorkers
: number,
142 protected readonly filePath
: string,
143 protected readonly opts
: PoolOptions
<Worker
>
145 if (!this.isMain()) {
147 'Cannot start a pool from a worker with the same type as the pool'
150 checkFilePath(this.filePath
)
151 this.checkNumberOfWorkers(this.numberOfWorkers
)
152 this.checkPoolOptions(this.opts
)
154 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
155 this.executeTask
= this.executeTask
.bind(this)
156 this.enqueueTask
= this.enqueueTask
.bind(this)
158 if (this.opts
.enableEvents
=== true) {
159 this.initializeEventEmitter()
161 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
167 this.opts
.workerChoiceStrategy
,
168 this.opts
.workerChoiceStrategyOptions
173 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
176 this.starting
= false
177 this.destroying
= false
178 this.readyEventEmitted
= false
179 if (this.opts
.startWorkers
=== true) {
183 this.startTimestamp
= performance
.now()
186 private checkNumberOfWorkers (numberOfWorkers
: number): void {
187 if (numberOfWorkers
== null) {
189 'Cannot instantiate a pool without specifying the number of workers'
191 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
193 'Cannot instantiate a pool with a non safe integer number of workers'
195 } else if (numberOfWorkers
< 0) {
196 throw new RangeError(
197 'Cannot instantiate a pool with a negative number of workers'
199 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
200 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
204 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
205 if (isPlainObject(opts
)) {
206 this.opts
.startWorkers
= opts
.startWorkers
?? true
207 checkValidWorkerChoiceStrategy(
208 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
210 this.opts
.workerChoiceStrategy
=
211 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
212 this.checkValidWorkerChoiceStrategyOptions(
213 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
215 this.opts
.workerChoiceStrategyOptions
= {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
217 ...opts
.workerChoiceStrategyOptions
219 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
220 this.opts
.enableEvents
= opts
.enableEvents
?? true
221 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
222 if (this.opts
.enableTasksQueue
) {
223 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
224 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
225 opts
.tasksQueueOptions
as TasksQueueOptions
229 throw new TypeError('Invalid pool options: must be a plain object')
233 private checkValidWorkerChoiceStrategyOptions (
234 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
237 workerChoiceStrategyOptions
!= null &&
238 !isPlainObject(workerChoiceStrategyOptions
)
241 'Invalid worker choice strategy options: must be a plain object'
245 workerChoiceStrategyOptions
?.retries
!= null &&
246 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
249 'Invalid worker choice strategy options: retries must be an integer'
253 workerChoiceStrategyOptions
?.retries
!= null &&
254 workerChoiceStrategyOptions
.retries
< 0
256 throw new RangeError(
257 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
261 workerChoiceStrategyOptions
?.weights
!= null &&
262 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
265 'Invalid worker choice strategy options: must have a weight for each worker node'
269 workerChoiceStrategyOptions
?.measurement
!= null &&
270 !Object.values(Measurements
).includes(
271 workerChoiceStrategyOptions
.measurement
275 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
280 private initializeEventEmitter (): void {
281 this.emitter
= new EventEmitterAsyncResource({
282 name
: `poolifier:${this.type}-${this.worker}-pool`
287 public get
info (): PoolInfo
{
292 started
: this.started
,
294 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
295 minSize
: this.minSize
,
296 maxSize
: this.maxSize
,
297 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
298 .runTime
.aggregate
&&
299 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
300 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
301 workerNodes
: this.workerNodes
.length
,
302 idleWorkerNodes
: this.workerNodes
.reduce(
303 (accumulator
, workerNode
) =>
304 workerNode
.usage
.tasks
.executing
=== 0
309 busyWorkerNodes
: this.workerNodes
.reduce(
310 (accumulator
, _workerNode
, workerNodeKey
) =>
311 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
314 executedTasks
: this.workerNodes
.reduce(
315 (accumulator
, workerNode
) =>
316 accumulator
+ workerNode
.usage
.tasks
.executed
,
319 executingTasks
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 accumulator
+ workerNode
.usage
.tasks
.executing
,
324 ...(this.opts
.enableTasksQueue
=== true && {
325 queuedTasks
: this.workerNodes
.reduce(
326 (accumulator
, workerNode
) =>
327 accumulator
+ workerNode
.usage
.tasks
.queued
,
331 ...(this.opts
.enableTasksQueue
=== true && {
332 maxQueuedTasks
: this.workerNodes
.reduce(
333 (accumulator
, workerNode
) =>
334 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
338 ...(this.opts
.enableTasksQueue
=== true && {
339 backPressure
: this.hasBackPressure()
341 ...(this.opts
.enableTasksQueue
=== true && {
342 stolenTasks
: this.workerNodes
.reduce(
343 (accumulator
, workerNode
) =>
344 accumulator
+ workerNode
.usage
.tasks
.stolen
,
348 failedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ workerNode
.usage
.tasks
.failed
,
353 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
354 .runTime
.aggregate
&& {
358 ...this.workerNodes
.map(
359 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
365 ...this.workerNodes
.map(
366 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
370 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
371 .runTime
.average
&& {
374 this.workerNodes
.reduce
<number[]>(
375 (accumulator
, workerNode
) =>
376 accumulator
.concat(workerNode
.usage
.runTime
.history
),
382 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
386 this.workerNodes
.reduce
<number[]>(
387 (accumulator
, workerNode
) =>
388 accumulator
.concat(workerNode
.usage
.runTime
.history
),
396 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
397 .waitTime
.aggregate
&& {
401 ...this.workerNodes
.map(
402 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
408 ...this.workerNodes
.map(
409 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
413 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
414 .waitTime
.average
&& {
417 this.workerNodes
.reduce
<number[]>(
418 (accumulator
, workerNode
) =>
419 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
425 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
426 .waitTime
.median
&& {
429 this.workerNodes
.reduce
<number[]>(
430 (accumulator
, workerNode
) =>
431 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
443 * The pool readiness boolean status.
445 private get
ready (): boolean {
447 this.workerNodes
.reduce(
448 (accumulator
, workerNode
) =>
449 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
458 * The approximate pool utilization.
460 * @returns The pool utilization.
462 private get
utilization (): number {
463 const poolTimeCapacity
=
464 (performance
.now() - this.startTimestamp
) * this.maxSize
465 const totalTasksRunTime
= this.workerNodes
.reduce(
466 (accumulator
, workerNode
) =>
467 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
470 const totalTasksWaitTime
= this.workerNodes
.reduce(
471 (accumulator
, workerNode
) =>
472 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
475 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
481 * If it is `'dynamic'`, it provides the `max` property.
483 protected abstract get
type (): PoolType
488 protected abstract get
worker (): WorkerType
491 * The pool minimum size.
493 protected get
minSize (): number {
494 return this.numberOfWorkers
498 * The pool maximum size.
500 protected get
maxSize (): number {
501 return this.max
?? this.numberOfWorkers
505 * Checks if the worker id sent in the received message from a worker is valid.
507 * @param message - The received message.
508 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
510 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
511 if (message
.workerId
== null) {
512 throw new Error('Worker message received without worker id')
513 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
515 `Worker message received from unknown worker '${message.workerId}'`
521 * Gets the given worker its worker node key.
523 * @param worker - The worker.
524 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
526 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
527 return this.workerNodes
.findIndex(
528 workerNode
=> workerNode
.worker
=== worker
533 * Gets the worker node key given its worker id.
535 * @param workerId - The worker id.
536 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
538 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
539 return this.workerNodes
.findIndex(
540 workerNode
=> workerNode
.info
.id
=== workerId
545 public setWorkerChoiceStrategy (
546 workerChoiceStrategy
: WorkerChoiceStrategy
,
547 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
549 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
550 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
551 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
552 this.opts
.workerChoiceStrategy
554 if (workerChoiceStrategyOptions
!= null) {
555 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
557 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
558 workerNode
.resetUsage()
559 this.sendStatisticsMessageToWorker(workerNodeKey
)
564 public setWorkerChoiceStrategyOptions (
565 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
567 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
568 this.opts
.workerChoiceStrategyOptions
= {
569 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
570 ...workerChoiceStrategyOptions
572 this.workerChoiceStrategyContext
.setOptions(
573 this.opts
.workerChoiceStrategyOptions
578 public enableTasksQueue (
580 tasksQueueOptions
?: TasksQueueOptions
582 if (this.opts
.enableTasksQueue
=== true && !enable
) {
583 this.unsetTaskStealing()
584 this.unsetTasksStealingOnBackPressure()
585 this.flushTasksQueues()
587 this.opts
.enableTasksQueue
= enable
588 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
592 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
593 if (this.opts
.enableTasksQueue
=== true) {
594 checkValidTasksQueueOptions(tasksQueueOptions
)
595 this.opts
.tasksQueueOptions
=
596 this.buildTasksQueueOptions(tasksQueueOptions
)
597 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
598 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
599 this.unsetTaskStealing()
600 this.setTaskStealing()
602 this.unsetTaskStealing()
604 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
605 this.unsetTasksStealingOnBackPressure()
606 this.setTasksStealingOnBackPressure()
608 this.unsetTasksStealingOnBackPressure()
610 } else if (this.opts
.tasksQueueOptions
!= null) {
611 delete this.opts
.tasksQueueOptions
615 private buildTasksQueueOptions (
616 tasksQueueOptions
: TasksQueueOptions
617 ): TasksQueueOptions
{
620 size
: Math.pow(this.maxSize
, 2),
623 tasksStealingOnBackPressure
: true
629 private setTasksQueueSize (size
: number): void {
630 for (const workerNode
of this.workerNodes
) {
631 workerNode
.tasksQueueBackPressureSize
= size
635 private setTaskStealing (): void {
636 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
637 this.workerNodes
[workerNodeKey
].on(
639 this.handleIdleWorkerNodeEvent
644 private unsetTaskStealing (): void {
645 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
646 this.workerNodes
[workerNodeKey
].off(
648 this.handleIdleWorkerNodeEvent
653 private setTasksStealingOnBackPressure (): void {
654 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
655 this.workerNodes
[workerNodeKey
].on(
657 this.handleBackPressureEvent
662 private unsetTasksStealingOnBackPressure (): void {
663 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
664 this.workerNodes
[workerNodeKey
].off(
666 this.handleBackPressureEvent
672 * Whether the pool is full or not.
674 * The pool filling boolean status.
676 protected get
full (): boolean {
677 return this.workerNodes
.length
>= this.maxSize
681 * Whether the pool is busy or not.
683 * The pool busyness boolean status.
685 protected abstract get
busy (): boolean
688 * Whether worker nodes are executing concurrently their tasks quota or not.
690 * @returns Worker nodes busyness boolean status.
692 protected internalBusy (): boolean {
693 if (this.opts
.enableTasksQueue
=== true) {
695 this.workerNodes
.findIndex(
697 workerNode
.info
.ready
&&
698 workerNode
.usage
.tasks
.executing
<
699 (this.opts
.tasksQueueOptions
?.concurrency
as number)
704 this.workerNodes
.findIndex(
706 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
711 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
712 if (this.opts
.enableTasksQueue
=== true) {
714 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
715 (this.opts
.tasksQueueOptions
?.concurrency
as number)
718 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
721 private async sendTaskFunctionOperationToWorker (
722 workerNodeKey
: number,
723 message
: MessageValue
<Data
>
724 ): Promise
<boolean> {
725 return await new Promise
<boolean>((resolve
, reject
) => {
726 const taskFunctionOperationListener
= (
727 message
: MessageValue
<Response
>
729 this.checkMessageWorkerId(message
)
730 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
732 message
.taskFunctionOperationStatus
!= null &&
733 message
.workerId
=== workerId
735 if (message
.taskFunctionOperationStatus
) {
737 } else if (!message
.taskFunctionOperationStatus
) {
740 `Task function operation '${
741 message.taskFunctionOperation as string
742 }' failed on worker ${message.workerId} with error: '${
743 message.workerError?.message as string
748 this.deregisterWorkerMessageListener(
749 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
750 taskFunctionOperationListener
754 this.registerWorkerMessageListener(
756 taskFunctionOperationListener
758 this.sendToWorker(workerNodeKey
, message
)
762 private async sendTaskFunctionOperationToWorkers (
763 message
: MessageValue
<Data
>
764 ): Promise
<boolean> {
765 return await new Promise
<boolean>((resolve
, reject
) => {
766 const responsesReceived
= new Array<MessageValue
<Response
>>()
767 const taskFunctionOperationsListener
= (
768 message
: MessageValue
<Response
>
770 this.checkMessageWorkerId(message
)
771 if (message
.taskFunctionOperationStatus
!= null) {
772 responsesReceived
.push(message
)
773 if (responsesReceived
.length
=== this.workerNodes
.length
) {
775 responsesReceived
.every(
776 message
=> message
.taskFunctionOperationStatus
=== true
781 responsesReceived
.some(
782 message
=> message
.taskFunctionOperationStatus
=== false
785 const errorResponse
= responsesReceived
.find(
786 response
=> response
.taskFunctionOperationStatus
=== false
790 `Task function operation '${
791 message.taskFunctionOperation as string
792 }' failed on worker ${
793 errorResponse?.workerId as number
795 errorResponse?.workerError?.message as string
800 this.deregisterWorkerMessageListener(
801 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
802 taskFunctionOperationsListener
807 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
808 this.registerWorkerMessageListener(
810 taskFunctionOperationsListener
812 this.sendToWorker(workerNodeKey
, message
)
818 public hasTaskFunction (name
: string): boolean {
819 for (const workerNode
of this.workerNodes
) {
821 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
822 workerNode
.info
.taskFunctionNames
.includes(name
)
831 public async addTaskFunction (
833 fn
: TaskFunction
<Data
, Response
>
834 ): Promise
<boolean> {
835 if (typeof name
!== 'string') {
836 throw new TypeError('name argument must be a string')
838 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
839 throw new TypeError('name argument must not be an empty string')
841 if (typeof fn
!== 'function') {
842 throw new TypeError('fn argument must be a function')
844 const opResult
= await this.sendTaskFunctionOperationToWorkers({
845 taskFunctionOperation
: 'add',
846 taskFunctionName
: name
,
847 taskFunction
: fn
.toString()
849 this.taskFunctions
.set(name
, fn
)
854 public async removeTaskFunction (name
: string): Promise
<boolean> {
855 if (!this.taskFunctions
.has(name
)) {
857 'Cannot remove a task function not handled on the pool side'
860 const opResult
= await this.sendTaskFunctionOperationToWorkers({
861 taskFunctionOperation
: 'remove',
862 taskFunctionName
: name
864 this.deleteTaskFunctionWorkerUsages(name
)
865 this.taskFunctions
.delete(name
)
870 public listTaskFunctionNames (): string[] {
871 for (const workerNode
of this.workerNodes
) {
873 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
874 workerNode
.info
.taskFunctionNames
.length
> 0
876 return workerNode
.info
.taskFunctionNames
883 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
884 return await this.sendTaskFunctionOperationToWorkers({
885 taskFunctionOperation
: 'default',
886 taskFunctionName
: name
890 private deleteTaskFunctionWorkerUsages (name
: string): void {
891 for (const workerNode
of this.workerNodes
) {
892 workerNode
.deleteTaskFunctionWorkerUsage(name
)
896 private shallExecuteTask (workerNodeKey
: number): boolean {
898 this.tasksQueueSize(workerNodeKey
) === 0 &&
899 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
900 (this.opts
.tasksQueueOptions
?.concurrency
as number)
905 public async execute (
908 transferList
?: TransferListItem
[]
909 ): Promise
<Response
> {
910 return await new Promise
<Response
>((resolve
, reject
) => {
912 reject(new Error('Cannot execute a task on not started pool'))
915 if (this.destroying
) {
916 reject(new Error('Cannot execute a task on destroying pool'))
919 if (name
!= null && typeof name
!== 'string') {
920 reject(new TypeError('name argument must be a string'))
925 typeof name
=== 'string' &&
926 name
.trim().length
=== 0
928 reject(new TypeError('name argument must not be an empty string'))
931 if (transferList
!= null && !Array.isArray(transferList
)) {
932 reject(new TypeError('transferList argument must be an array'))
935 const timestamp
= performance
.now()
936 const workerNodeKey
= this.chooseWorkerNode()
937 const task
: Task
<Data
> = {
938 name
: name
?? DEFAULT_TASK_NAME
,
939 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
940 data
: data
?? ({} as Data
),
945 this.promiseResponseMap
.set(task
.taskId
as string, {
949 ...(this.emitter
!= null && {
950 asyncResource
: new AsyncResource('poolifier:task', {
951 triggerAsyncId
: this.emitter
.asyncId
,
952 requireManualDestroy
: true
957 this.opts
.enableTasksQueue
=== false ||
958 (this.opts
.enableTasksQueue
=== true &&
959 this.shallExecuteTask(workerNodeKey
))
961 this.executeTask(workerNodeKey
, task
)
963 this.enqueueTask(workerNodeKey
, task
)
969 public start (): void {
971 throw new Error('Cannot start an already started pool')
974 throw new Error('Cannot start an already starting pool')
976 if (this.destroying
) {
977 throw new Error('Cannot start a destroying pool')
981 this.workerNodes
.reduce(
982 (accumulator
, workerNode
) =>
983 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
985 ) < this.numberOfWorkers
987 this.createAndSetupWorkerNode()
989 this.starting
= false
994 public async destroy (): Promise
<void> {
996 throw new Error('Cannot destroy an already destroyed pool')
999 throw new Error('Cannot destroy an starting pool')
1001 if (this.destroying
) {
1002 throw new Error('Cannot destroy an already destroying pool')
1004 this.destroying
= true
1006 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
1007 await this.destroyWorkerNode(workerNodeKey
)
1010 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1011 this.emitter
?.emitDestroy()
1012 this.emitter
?.removeAllListeners()
1013 this.readyEventEmitted
= false
1014 this.destroying
= false
1015 this.started
= false
1018 protected async sendKillMessageToWorker (
1019 workerNodeKey
: number
1021 await new Promise
<void>((resolve
, reject
) => {
1022 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1023 this.checkMessageWorkerId(message
)
1024 if (message
.kill
=== 'success') {
1026 } else if (message
.kill
=== 'failure') {
1029 `Kill message handling failed on worker ${
1030 message.workerId as number
1036 // FIXME: should be registered only once
1037 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1038 this.sendToWorker(workerNodeKey
, { kill
: true })
1043 * Terminates the worker node given its worker node key.
1045 * @param workerNodeKey - The worker node key.
1047 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1048 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1049 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1050 const workerNode
= this.workerNodes
[workerNodeKey
]
1051 await waitWorkerNodeEvents(workerNode
, 'taskFinished', flushedTasks
)
1052 await this.sendKillMessageToWorker(workerNodeKey
)
1053 await workerNode
.terminate()
1057 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1058 * Can be overridden.
1062 protected setupHook (): void {
1063 /* Intentionally empty */
1067 * Should return whether the worker is the main worker or not.
1069 protected abstract isMain (): boolean
1072 * Hook executed before the worker task execution.
1073 * Can be overridden.
1075 * @param workerNodeKey - The worker node key.
1076 * @param task - The task to execute.
1078 protected beforeTaskExecutionHook (
1079 workerNodeKey
: number,
1082 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1083 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1084 ++workerUsage
.tasks
.executing
1085 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1088 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1089 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1093 const taskFunctionWorkerUsage
= this.workerNodes
[
1095 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1096 ++taskFunctionWorkerUsage
.tasks
.executing
1097 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1102 * Hook executed after the worker task execution.
1103 * Can be overridden.
1105 * @param workerNodeKey - The worker node key.
1106 * @param message - The received message.
1108 protected afterTaskExecutionHook (
1109 workerNodeKey
: number,
1110 message
: MessageValue
<Response
>
1112 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1113 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1114 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1115 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1116 this.updateEluWorkerUsage(workerUsage
, message
)
1119 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1120 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1121 message
.taskPerformance
?.name
as string
1124 const taskFunctionWorkerUsage
= this.workerNodes
[
1126 ].getTaskFunctionWorkerUsage(
1127 message
.taskPerformance
?.name
as string
1129 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1130 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1131 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1136 * Whether the worker node shall update its task function worker usage or not.
1138 * @param workerNodeKey - The worker node key.
1139 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1141 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1142 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1144 workerInfo
!= null &&
1145 Array.isArray(workerInfo
.taskFunctionNames
) &&
1146 workerInfo
.taskFunctionNames
.length
> 2
1150 private updateTaskStatisticsWorkerUsage (
1151 workerUsage
: WorkerUsage
,
1152 message
: MessageValue
<Response
>
1154 const workerTaskStatistics
= workerUsage
.tasks
1156 workerTaskStatistics
.executing
!= null &&
1157 workerTaskStatistics
.executing
> 0
1159 --workerTaskStatistics
.executing
1161 if (message
.workerError
== null) {
1162 ++workerTaskStatistics
.executed
1164 ++workerTaskStatistics
.failed
1168 private updateRunTimeWorkerUsage (
1169 workerUsage
: WorkerUsage
,
1170 message
: MessageValue
<Response
>
1172 if (message
.workerError
!= null) {
1175 updateMeasurementStatistics(
1176 workerUsage
.runTime
,
1177 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1178 message
.taskPerformance
?.runTime
?? 0
1182 private updateWaitTimeWorkerUsage (
1183 workerUsage
: WorkerUsage
,
1186 const timestamp
= performance
.now()
1187 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1188 updateMeasurementStatistics(
1189 workerUsage
.waitTime
,
1190 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1195 private updateEluWorkerUsage (
1196 workerUsage
: WorkerUsage
,
1197 message
: MessageValue
<Response
>
1199 if (message
.workerError
!= null) {
1202 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1203 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1204 updateMeasurementStatistics(
1205 workerUsage
.elu
.active
,
1206 eluTaskStatisticsRequirements
,
1207 message
.taskPerformance
?.elu
?.active
?? 0
1209 updateMeasurementStatistics(
1210 workerUsage
.elu
.idle
,
1211 eluTaskStatisticsRequirements
,
1212 message
.taskPerformance
?.elu
?.idle
?? 0
1214 if (eluTaskStatisticsRequirements
.aggregate
) {
1215 if (message
.taskPerformance
?.elu
!= null) {
1216 if (workerUsage
.elu
.utilization
!= null) {
1217 workerUsage
.elu
.utilization
=
1218 (workerUsage
.elu
.utilization
+
1219 message
.taskPerformance
.elu
.utilization
) /
1222 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1229 * Chooses a worker node for the next task.
1231 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1233 * @returns The chosen worker node key
1235 private chooseWorkerNode (): number {
1236 if (this.shallCreateDynamicWorker()) {
1237 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1239 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1241 return workerNodeKey
1244 return this.workerChoiceStrategyContext
.execute()
1248 * Conditions for dynamic worker creation.
1250 * @returns Whether to create a dynamic worker or not.
1252 private shallCreateDynamicWorker (): boolean {
1253 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1257 * Sends a message to worker given its worker node key.
1259 * @param workerNodeKey - The worker node key.
1260 * @param message - The message.
1261 * @param transferList - The optional array of transferable objects.
1263 protected abstract sendToWorker (
1264 workerNodeKey
: number,
1265 message
: MessageValue
<Data
>,
1266 transferList
?: TransferListItem
[]
1270 * Creates a new, completely set up worker node.
1272 * @returns New, completely set up worker node key.
1274 protected createAndSetupWorkerNode (): number {
1275 const workerNode
= this.createWorkerNode()
1276 workerNode
.registerWorkerEventHandler(
1278 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1280 workerNode
.registerWorkerEventHandler(
1282 this.opts
.messageHandler
?? EMPTY_FUNCTION
1284 workerNode
.registerWorkerEventHandler(
1286 this.opts
.errorHandler
?? EMPTY_FUNCTION
1288 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1289 workerNode
.info
.ready
= false
1290 this.emitter
?.emit(PoolEvents
.error
, error
)
1295 this.opts
.restartWorkerOnError
=== true
1297 if (workerNode
.info
.dynamic
) {
1298 this.createAndSetupDynamicWorkerNode()
1300 this.createAndSetupWorkerNode()
1303 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1304 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1306 workerNode
.terminate().catch(error
=> {
1307 this.emitter
?.emit(PoolEvents
.error
, error
)
1310 workerNode
.registerWorkerEventHandler(
1312 this.opts
.exitHandler
?? EMPTY_FUNCTION
1314 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1315 this.removeWorkerNode(workerNode
)
1317 const workerNodeKey
= this.addWorkerNode(workerNode
)
1318 this.afterWorkerNodeSetup(workerNodeKey
)
1319 return workerNodeKey
1323 * Creates a new, completely set up dynamic worker node.
1325 * @returns New, completely set up dynamic worker node key.
1327 protected createAndSetupDynamicWorkerNode (): number {
1328 const workerNodeKey
= this.createAndSetupWorkerNode()
1329 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1330 this.checkMessageWorkerId(message
)
1331 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1334 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1335 // Kill message received from worker
1337 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1338 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1339 ((this.opts
.enableTasksQueue
=== false &&
1340 workerUsage
.tasks
.executing
=== 0) ||
1341 (this.opts
.enableTasksQueue
=== true &&
1342 workerUsage
.tasks
.executing
=== 0 &&
1343 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1345 // Flag the worker node as not ready immediately
1346 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1347 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1348 this.emitter
?.emit(PoolEvents
.error
, error
)
1352 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1353 this.sendToWorker(workerNodeKey
, {
1356 if (this.taskFunctions
.size
> 0) {
1357 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1358 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1359 taskFunctionOperation
: 'add',
1361 taskFunction
: taskFunction
.toString()
1363 this.emitter
?.emit(PoolEvents
.error
, error
)
1367 workerInfo
.dynamic
= true
1369 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1370 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1372 workerInfo
.ready
= true
1374 this.checkAndEmitDynamicWorkerCreationEvents()
1375 return workerNodeKey
1379 * Registers a listener callback on the worker given its worker node key.
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1384 protected abstract registerWorkerMessageListener
<
1385 Message
extends Data
| Response
1387 workerNodeKey
: number,
1388 listener
: (message
: MessageValue
<Message
>) => void
1392 * Registers once a listener callback on the worker given its worker node key.
1394 * @param workerNodeKey - The worker node key.
1395 * @param listener - The message listener callback.
1397 protected abstract registerOnceWorkerMessageListener
<
1398 Message
extends Data
| Response
1400 workerNodeKey
: number,
1401 listener
: (message
: MessageValue
<Message
>) => void
1405 * Deregisters a listener callback on the worker given its worker node key.
1407 * @param workerNodeKey - The worker node key.
1408 * @param listener - The message listener callback.
1410 protected abstract deregisterWorkerMessageListener
<
1411 Message
extends Data
| Response
1413 workerNodeKey
: number,
1414 listener
: (message
: MessageValue
<Message
>) => void
1418 * Method hooked up after a worker node has been newly created.
1419 * Can be overridden.
1421 * @param workerNodeKey - The newly created worker node key.
1423 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1424 // Listen to worker messages.
1425 this.registerWorkerMessageListener(
1427 this.workerMessageListener
1429 // Send the startup message to worker.
1430 this.sendStartupMessageToWorker(workerNodeKey
)
1431 // Send the statistics message to worker.
1432 this.sendStatisticsMessageToWorker(workerNodeKey
)
1433 if (this.opts
.enableTasksQueue
=== true) {
1434 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1435 this.workerNodes
[workerNodeKey
].on(
1437 this.handleIdleWorkerNodeEvent
1440 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1441 this.workerNodes
[workerNodeKey
].on(
1443 this.handleBackPressureEvent
1450 * Sends the startup message to worker given its worker node key.
1452 * @param workerNodeKey - The worker node key.
1454 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1457 * Sends the statistics message to worker given its worker node key.
1459 * @param workerNodeKey - The worker node key.
1461 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1462 this.sendToWorker(workerNodeKey
, {
1465 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1467 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1473 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1474 if (this.shallExecuteTask(workerNodeKey
)) {
1475 this.executeTask(workerNodeKey
, task
)
1477 this.enqueueTask(workerNodeKey
, task
)
1481 private redistributeQueuedTasks (workerNodeKey
: number): void {
1482 if (this.workerNodes
.length
<= 1) {
1485 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1486 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1487 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1488 return workerNode
.info
.ready
&&
1489 workerNode
.usage
.tasks
.queued
<
1490 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1497 destinationWorkerNodeKey
,
1498 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1503 private updateTaskStolenStatisticsWorkerUsage (
1504 workerNodeKey
: number,
1507 const workerNode
= this.workerNodes
[workerNodeKey
]
1508 if (workerNode
?.usage
!= null) {
1509 ++workerNode
.usage
.tasks
.stolen
1512 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1513 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1515 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1518 ++taskFunctionWorkerUsage
.tasks
.stolen
1522 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1523 workerNodeKey
: number
1525 const workerNode
= this.workerNodes
[workerNodeKey
]
1526 if (workerNode
?.usage
!= null) {
1527 ++workerNode
.usage
.tasks
.sequentiallyStolen
1531 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1532 workerNodeKey
: number,
1535 const workerNode
= this.workerNodes
[workerNodeKey
]
1537 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1538 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1540 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1543 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1547 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1548 workerNodeKey
: number
1550 const workerNode
= this.workerNodes
[workerNodeKey
]
1551 if (workerNode
?.usage
!= null) {
1552 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1556 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1557 workerNodeKey
: number,
1560 const workerNode
= this.workerNodes
[workerNodeKey
]
1562 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1563 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1565 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1568 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1572 private readonly handleIdleWorkerNodeEvent
= (
1573 eventDetail
: WorkerNodeEventDetail
,
1574 previousStolenTask
?: Task
<Data
>
1576 if (this.workerNodes
.length
<= 1) {
1579 const { workerNodeKey
} = eventDetail
1580 if (workerNodeKey
== null) {
1582 'WorkerNode event detail workerNodeKey attribute must be defined'
1585 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1587 previousStolenTask
!= null &&
1588 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1589 (workerNodeTasksUsage
.executing
> 0 ||
1590 this.tasksQueueSize(workerNodeKey
) > 0)
1592 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1593 .taskFunctionNames
as string[]) {
1594 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1599 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1602 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1604 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1607 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1609 ].getTaskFunctionWorkerUsage(stolenTask
.name
as string)
1610 ?.tasks
as TaskStatistics
1612 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1613 (previousStolenTask
!= null &&
1614 previousStolenTask
.name
=== stolenTask
.name
&&
1615 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1617 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1619 stolenTask
.name
as string
1622 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1624 stolenTask
.name
as string
1628 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1630 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1633 .catch(EMPTY_FUNCTION
)
1636 private readonly workerNodeStealTask
= (
1637 workerNodeKey
: number
1638 ): Task
<Data
> | undefined => {
1639 const workerNodes
= this.workerNodes
1642 (workerNodeA
, workerNodeB
) =>
1643 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1645 const sourceWorkerNode
= workerNodes
.find(
1646 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1647 sourceWorkerNode
.info
.ready
&&
1648 sourceWorkerNodeKey
!== workerNodeKey
&&
1649 sourceWorkerNode
.usage
.tasks
.queued
> 0
1651 if (sourceWorkerNode
!= null) {
1652 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1653 this.handleTask(workerNodeKey
, task
)
1654 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1655 this.updateTaskStolenStatisticsWorkerUsage(
1663 private readonly handleBackPressureEvent
= (
1664 eventDetail
: WorkerNodeEventDetail
1666 if (this.workerNodes
.length
<= 1) {
1669 const { workerId
} = eventDetail
1670 const sizeOffset
= 1
1671 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1674 const sourceWorkerNode
=
1675 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1676 const workerNodes
= this.workerNodes
1679 (workerNodeA
, workerNodeB
) =>
1680 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1682 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1684 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1685 workerNode
.info
.ready
&&
1686 workerNode
.info
.id
!== workerId
&&
1687 workerNode
.usage
.tasks
.queued
<
1688 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1690 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1691 this.handleTask(workerNodeKey
, task
)
1692 this.updateTaskStolenStatisticsWorkerUsage(
1701 * This method is the message listener registered on each worker.
1703 protected readonly workerMessageListener
= (
1704 message
: MessageValue
<Response
>
1706 this.checkMessageWorkerId(message
)
1707 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1708 if (ready
!= null && taskFunctionNames
!= null) {
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message
)
1711 } else if (taskId
!= null) {
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message
)
1714 } else if (taskFunctionNames
!= null) {
1715 // Task function names message received from worker
1717 this.getWorkerNodeKeyByWorkerId(workerId
)
1718 ).taskFunctionNames
= taskFunctionNames
1722 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1723 const { workerId
, ready
, taskFunctionNames
} = message
1724 if (ready
=== false) {
1725 throw new Error(`Worker ${workerId as number} failed to initialize`)
1727 const workerInfo
= this.getWorkerInfo(
1728 this.getWorkerNodeKeyByWorkerId(workerId
)
1730 workerInfo
.ready
= ready
as boolean
1731 workerInfo
.taskFunctionNames
= taskFunctionNames
1732 if (!this.readyEventEmitted
&& this.ready
) {
1733 this.readyEventEmitted
= true
1734 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1738 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1739 const { workerId
, taskId
, workerError
, data
} = message
1740 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1741 if (promiseResponse
!= null) {
1742 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1743 const workerNode
= this.workerNodes
[workerNodeKey
]
1744 if (workerError
!= null) {
1745 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1746 asyncResource
!= null
1747 ? asyncResource
.runInAsyncScope(
1752 : reject(workerError
.message
)
1754 asyncResource
!= null
1755 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1756 : resolve(data
as Response
)
1758 asyncResource
?.emitDestroy()
1759 this.afterTaskExecutionHook(workerNodeKey
, message
)
1760 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1761 this.promiseResponseMap
.delete(taskId
as string)
1762 workerNode
?.emit('taskFinished', taskId
)
1763 if (this.opts
.enableTasksQueue
=== true) {
1764 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1766 this.tasksQueueSize(workerNodeKey
) > 0 &&
1767 workerNodeTasksUsage
.executing
<
1768 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1772 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1776 workerNodeTasksUsage
.executing
=== 0 &&
1777 this.tasksQueueSize(workerNodeKey
) === 0 &&
1778 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1780 workerNode
.emit('idleWorkerNode', {
1781 workerId
: workerId
as number,
1789 private checkAndEmitTaskExecutionEvents (): void {
1791 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1795 private checkAndEmitTaskQueuingEvents (): void {
1796 if (this.hasBackPressure()) {
1797 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1801 private checkAndEmitDynamicWorkerCreationEvents (): void {
1802 if (this.type === PoolTypes
.dynamic
) {
1804 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1810 * Gets the worker information given its worker node key.
1812 * @param workerNodeKey - The worker node key.
1813 * @returns The worker information.
1815 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1816 return this.workerNodes
[workerNodeKey
]?.info
1820 * Creates a worker node.
1822 * @returns The created worker node.
1824 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1825 const workerNode
= new WorkerNode
<Worker
, Data
>(
1830 workerOptions
: this.opts
.workerOptions
,
1831 tasksQueueBackPressureSize
:
1832 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1835 // Flag the worker node as ready at pool startup.
1836 if (this.starting
) {
1837 workerNode
.info
.ready
= true
1843 * Adds the given worker node in the pool worker nodes.
1845 * @param workerNode - The worker node.
1846 * @returns The added worker node key.
1847 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1849 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1850 this.workerNodes
.push(workerNode
)
1851 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1852 if (workerNodeKey
=== -1) {
1853 throw new Error('Worker added not found in worker nodes')
1855 return workerNodeKey
1859 * Removes the worker node from the pool worker nodes.
1861 * @param workerNode - The worker node.
1863 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1864 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1865 if (workerNodeKey
!== -1) {
1866 this.workerNodes
.splice(workerNodeKey
, 1)
1867 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1871 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1872 this.getWorkerInfo(workerNodeKey
).ready
= false
1876 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1878 this.opts
.enableTasksQueue
=== true &&
1879 this.workerNodes
[workerNodeKey
].hasBackPressure()
1883 private hasBackPressure (): boolean {
1885 this.opts
.enableTasksQueue
=== true &&
1886 this.workerNodes
.findIndex(
1887 workerNode
=> !workerNode
.hasBackPressure()
1893 * Executes the given task on the worker given its worker node key.
1895 * @param workerNodeKey - The worker node key.
1896 * @param task - The task to execute.
1898 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1899 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1900 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1901 this.checkAndEmitTaskExecutionEvents()
1904 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1905 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1906 this.checkAndEmitTaskQueuingEvents()
1907 return tasksQueueSize
1910 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1911 return this.workerNodes
[workerNodeKey
].dequeueTask()
1914 private tasksQueueSize (workerNodeKey
: number): number {
1915 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1918 protected flushTasksQueue (workerNodeKey
: number): number {
1919 let flushedTasks
= 0
1920 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1923 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1927 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1931 private flushTasksQueues (): void {
1932 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1933 this.flushTasksQueue(workerNodeKey
)