1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { AsyncResource
} from
'node:async_hooks'
8 PromiseResponseWrapper
,
10 } from
'../utility-types.js'
24 import { KillBehaviors
} from
'../worker/worker-options.js'
25 import type { TaskFunction
} from
'../worker/task-functions.js'
33 type TasksQueueOptions
39 WorkerNodeEventDetail
,
44 WorkerChoiceStrategies
,
45 type WorkerChoiceStrategy
,
46 type WorkerChoiceStrategyOptions
47 } from
'./selection-strategies/selection-strategies-types.js'
48 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context.js'
49 import { version
} from
'./version.js'
50 import { WorkerNode
} from
'./worker-node.js'
53 checkValidTasksQueueOptions
,
54 checkValidWorkerChoiceStrategy
,
55 getDefaultTasksQueueOptions
,
57 updateRunTimeWorkerUsage
,
58 updateTaskStatisticsWorkerUsage
,
59 updateWaitTimeWorkerUsage
,
64 * Base class that implements some shared logic for all poolifier pools.
66 * @typeParam Worker - Type of worker which manages this pool.
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
70 export abstract class AbstractPool
<
71 Worker
extends IWorker
,
74 > implements IPool
<Worker
, Data
, Response
> {
76 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
79 public emitter
?: EventEmitterAsyncResource
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
89 new Map
<string, PromiseResponseWrapper
<Response
>>()
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
105 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
108 * Whether the pool is started or not.
110 private started
: boolean
112 * Whether the pool is starting or not.
114 private starting
: boolean
116 * Whether the pool is destroying or not.
118 private destroying
: boolean
120 * Whether the pool ready event has been emitted or not.
122 private readyEventEmitted
: boolean
124 * The start timestamp of the pool.
126 private readonly startTimestamp
129 * Constructs a new poolifier pool.
131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
137 protected readonly minimumNumberOfWorkers
: number,
138 protected readonly filePath
: string,
139 protected readonly opts
: PoolOptions
<Worker
>,
140 protected readonly maximumNumberOfWorkers
?: number
142 if (!this.isMain()) {
144 'Cannot start a pool from a worker with the same type as the pool'
148 checkFilePath(this.filePath
)
149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers
)
150 this.checkPoolOptions(this.opts
)
152 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
153 this.executeTask
= this.executeTask
.bind(this)
154 this.enqueueTask
= this.enqueueTask
.bind(this)
156 if (this.opts
.enableEvents
=== true) {
157 this.initializeEventEmitter()
159 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
165 this.opts
.workerChoiceStrategy
,
166 this.opts
.workerChoiceStrategyOptions
171 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
174 this.starting
= false
175 this.destroying
= false
176 this.readyEventEmitted
= false
177 if (this.opts
.startWorkers
=== true) {
181 this.startTimestamp
= performance
.now()
184 private checkPoolType (): void {
185 if (this.type === PoolTypes
.fixed
&& this.maximumNumberOfWorkers
!= null) {
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
192 private checkMinimumNumberOfWorkers (minimumNumberOfWorkers
: number): void {
193 if (minimumNumberOfWorkers
== null) {
195 'Cannot instantiate a pool without specifying the number of workers'
197 } else if (!Number.isSafeInteger(minimumNumberOfWorkers
)) {
199 'Cannot instantiate a pool with a non safe integer number of workers'
201 } else if (minimumNumberOfWorkers
< 0) {
202 throw new RangeError(
203 'Cannot instantiate a pool with a negative number of workers'
205 } else if (this.type === PoolTypes
.fixed
&& minimumNumberOfWorkers
=== 0) {
206 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
210 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
211 if (isPlainObject(opts
)) {
212 this.opts
.startWorkers
= opts
.startWorkers
?? true
213 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
214 checkValidWorkerChoiceStrategy(opts
.workerChoiceStrategy
!)
215 this.opts
.workerChoiceStrategy
=
216 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
217 this.checkValidWorkerChoiceStrategyOptions(
218 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
219 opts
.workerChoiceStrategyOptions
!
221 if (opts
.workerChoiceStrategyOptions
!= null) {
222 this.opts
.workerChoiceStrategyOptions
= opts
.workerChoiceStrategyOptions
224 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
225 this.opts
.enableEvents
= opts
.enableEvents
?? true
226 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
227 if (this.opts
.enableTasksQueue
) {
228 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
229 checkValidTasksQueueOptions(opts
.tasksQueueOptions
!)
230 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
231 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
232 opts
.tasksQueueOptions
!
236 throw new TypeError('Invalid pool options: must be a plain object')
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
244 workerChoiceStrategyOptions
!= null &&
245 !isPlainObject(workerChoiceStrategyOptions
)
248 'Invalid worker choice strategy options: must be a plain object'
252 workerChoiceStrategyOptions
?.weights
!= null &&
253 Object.keys(workerChoiceStrategyOptions
.weights
).length
!==
254 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
257 'Invalid worker choice strategy options: must have a weight for each worker node'
261 workerChoiceStrategyOptions
?.measurement
!= null &&
262 !Object.values(Measurements
).includes(
263 workerChoiceStrategyOptions
.measurement
267 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 private initializeEventEmitter (): void {
273 this.emitter
= new EventEmitterAsyncResource({
274 name
: `poolifier:${this.type}-${this.worker}-pool`
279 public get
info (): PoolInfo
{
284 started
: this.started
,
286 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
287 strategy
: this.opts
.workerChoiceStrategy
!,
288 minSize
: this.minimumNumberOfWorkers
,
289 maxSize
: this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
,
290 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
291 .runTime
.aggregate
&&
292 this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
293 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
294 workerNodes
: this.workerNodes
.length
,
295 idleWorkerNodes
: this.workerNodes
.reduce(
296 (accumulator
, workerNode
) =>
297 workerNode
.usage
.tasks
.executing
=== 0
302 ...(this.opts
.enableTasksQueue
=== true && {
303 stealingWorkerNodes
: this.workerNodes
.reduce(
304 (accumulator
, workerNode
) =>
305 workerNode
.info
.stealing
? accumulator
+ 1 : accumulator
,
309 busyWorkerNodes
: this.workerNodes
.reduce(
310 (accumulator
, _workerNode
, workerNodeKey
) =>
311 this.isWorkerNodeBusy(workerNodeKey
) ? accumulator
+ 1 : accumulator
,
314 executedTasks
: this.workerNodes
.reduce(
315 (accumulator
, workerNode
) =>
316 accumulator
+ workerNode
.usage
.tasks
.executed
,
319 executingTasks
: this.workerNodes
.reduce(
320 (accumulator
, workerNode
) =>
321 accumulator
+ workerNode
.usage
.tasks
.executing
,
324 ...(this.opts
.enableTasksQueue
=== true && {
325 queuedTasks
: this.workerNodes
.reduce(
326 (accumulator
, workerNode
) =>
327 accumulator
+ workerNode
.usage
.tasks
.queued
,
331 ...(this.opts
.enableTasksQueue
=== true && {
332 maxQueuedTasks
: this.workerNodes
.reduce(
333 (accumulator
, workerNode
) =>
334 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
338 ...(this.opts
.enableTasksQueue
=== true && {
339 backPressure
: this.hasBackPressure()
341 ...(this.opts
.enableTasksQueue
=== true && {
342 stolenTasks
: this.workerNodes
.reduce(
343 (accumulator
, workerNode
) =>
344 accumulator
+ workerNode
.usage
.tasks
.stolen
,
348 failedTasks
: this.workerNodes
.reduce(
349 (accumulator
, workerNode
) =>
350 accumulator
+ workerNode
.usage
.tasks
.failed
,
353 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
354 .runTime
.aggregate
&& {
358 ...this.workerNodes
.map(
359 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
365 ...this.workerNodes
.map(
366 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
370 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
371 .runTime
.average
&& {
374 this.workerNodes
.reduce
<number[]>(
375 (accumulator
, workerNode
) =>
376 accumulator
.concat(workerNode
.usage
.runTime
.history
),
382 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
386 this.workerNodes
.reduce
<number[]>(
387 (accumulator
, workerNode
) =>
388 accumulator
.concat(workerNode
.usage
.runTime
.history
),
396 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
397 .waitTime
.aggregate
&& {
401 ...this.workerNodes
.map(
402 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
408 ...this.workerNodes
.map(
409 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
413 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
414 .waitTime
.average
&& {
417 this.workerNodes
.reduce
<number[]>(
418 (accumulator
, workerNode
) =>
419 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
425 ...(this.workerChoiceStrategyContext
?.getTaskStatisticsRequirements()
426 .waitTime
.median
&& {
429 this.workerNodes
.reduce
<number[]>(
430 (accumulator
, workerNode
) =>
431 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
443 * The pool readiness boolean status.
445 private get
ready (): boolean {
447 this.workerNodes
.reduce(
448 (accumulator
, workerNode
) =>
449 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
453 ) >= this.minimumNumberOfWorkers
458 * The approximate pool utilization.
460 * @returns The pool utilization.
462 private get
utilization (): number {
463 const poolTimeCapacity
=
464 (performance
.now() - this.startTimestamp
) *
465 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
466 const totalTasksRunTime
= this.workerNodes
.reduce(
467 (accumulator
, workerNode
) =>
468 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
471 const totalTasksWaitTime
= this.workerNodes
.reduce(
472 (accumulator
, workerNode
) =>
473 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
476 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
482 * If it is `'dynamic'`, it provides the `max` property.
484 protected abstract get
type (): PoolType
489 protected abstract get
worker (): WorkerType
492 * Checks if the worker id sent in the received message from a worker is valid.
494 * @param message - The received message.
495 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
497 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
498 if (message
.workerId
== null) {
499 throw new Error('Worker message received without worker id')
500 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
502 `Worker message received from unknown worker '${message.workerId}'`
508 * Gets the worker node key given its worker id.
510 * @param workerId - The worker id.
511 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
513 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
514 return this.workerNodes
.findIndex(
515 workerNode
=> workerNode
.info
.id
=== workerId
520 public setWorkerChoiceStrategy (
521 workerChoiceStrategy
: WorkerChoiceStrategy
,
522 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
524 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
525 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
526 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
527 this.opts
.workerChoiceStrategy
529 if (workerChoiceStrategyOptions
!= null) {
530 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
532 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
533 workerNode
.resetUsage()
534 this.sendStatisticsMessageToWorker(workerNodeKey
)
539 public setWorkerChoiceStrategyOptions (
540 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
542 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
543 if (workerChoiceStrategyOptions
!= null) {
544 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
546 this.workerChoiceStrategyContext
.setOptions(
547 this.opts
.workerChoiceStrategyOptions
552 public enableTasksQueue (
554 tasksQueueOptions
?: TasksQueueOptions
556 if (this.opts
.enableTasksQueue
=== true && !enable
) {
557 this.unsetTaskStealing()
558 this.unsetTasksStealingOnBackPressure()
559 this.flushTasksQueues()
561 this.opts
.enableTasksQueue
= enable
562 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
563 this.setTasksQueueOptions(tasksQueueOptions
!)
567 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
568 if (this.opts
.enableTasksQueue
=== true) {
569 checkValidTasksQueueOptions(tasksQueueOptions
)
570 this.opts
.tasksQueueOptions
=
571 this.buildTasksQueueOptions(tasksQueueOptions
)
572 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
573 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
!)
574 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
575 this.unsetTaskStealing()
576 this.setTaskStealing()
578 this.unsetTaskStealing()
580 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
581 this.unsetTasksStealingOnBackPressure()
582 this.setTasksStealingOnBackPressure()
584 this.unsetTasksStealingOnBackPressure()
586 } else if (this.opts
.tasksQueueOptions
!= null) {
587 delete this.opts
.tasksQueueOptions
591 private buildTasksQueueOptions (
592 tasksQueueOptions
: TasksQueueOptions
593 ): TasksQueueOptions
{
595 ...getDefaultTasksQueueOptions(
596 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
602 private setTasksQueueSize (size
: number): void {
603 for (const workerNode
of this.workerNodes
) {
604 workerNode
.tasksQueueBackPressureSize
= size
608 private setTaskStealing (): void {
609 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
610 this.workerNodes
[workerNodeKey
].on(
612 this.handleIdleWorkerNodeEvent
617 private unsetTaskStealing (): void {
618 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
619 this.workerNodes
[workerNodeKey
].off(
621 this.handleIdleWorkerNodeEvent
626 private setTasksStealingOnBackPressure (): void {
627 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
628 this.workerNodes
[workerNodeKey
].on(
630 this.handleBackPressureEvent
635 private unsetTasksStealingOnBackPressure (): void {
636 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
637 this.workerNodes
[workerNodeKey
].off(
639 this.handleBackPressureEvent
645 * Whether the pool is full or not.
647 * The pool filling boolean status.
649 protected get
full (): boolean {
651 this.workerNodes
.length
>=
652 (this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
)
657 * Whether the pool is busy or not.
659 * The pool busyness boolean status.
661 protected abstract get
busy (): boolean
664 * Whether worker nodes are executing concurrently their tasks quota or not.
666 * @returns Worker nodes busyness boolean status.
668 protected internalBusy (): boolean {
669 if (this.opts
.enableTasksQueue
=== true) {
671 this.workerNodes
.findIndex(
673 workerNode
.info
.ready
&&
674 workerNode
.usage
.tasks
.executing
<
675 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
676 this.opts
.tasksQueueOptions
!.concurrency
!
681 this.workerNodes
.findIndex(
683 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
688 private isWorkerNodeBusy (workerNodeKey
: number): boolean {
689 if (this.opts
.enableTasksQueue
=== true) {
691 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
692 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
693 this.opts
.tasksQueueOptions
!.concurrency
!
696 return this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
> 0
699 private async sendTaskFunctionOperationToWorker (
700 workerNodeKey
: number,
701 message
: MessageValue
<Data
>
702 ): Promise
<boolean> {
703 return await new Promise
<boolean>((resolve
, reject
) => {
704 const taskFunctionOperationListener
= (
705 message
: MessageValue
<Response
>
707 this.checkMessageWorkerId(message
)
708 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
709 const workerId
= this.getWorkerInfo(workerNodeKey
).id
!
711 message
.taskFunctionOperationStatus
!= null &&
712 message
.workerId
=== workerId
714 if (message
.taskFunctionOperationStatus
) {
716 } else if (!message
.taskFunctionOperationStatus
) {
719 `Task function operation '${
720 message.taskFunctionOperation as string
721 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
722 }' failed on worker ${message.workerId} with error: '${
723 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
724 message.workerError!.message
729 this.deregisterWorkerMessageListener(
730 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
731 taskFunctionOperationListener
735 this.registerWorkerMessageListener(
737 taskFunctionOperationListener
739 this.sendToWorker(workerNodeKey
, message
)
743 private async sendTaskFunctionOperationToWorkers (
744 message
: MessageValue
<Data
>
745 ): Promise
<boolean> {
746 return await new Promise
<boolean>((resolve
, reject
) => {
747 const responsesReceived
= new Array<MessageValue
<Response
>>()
748 const taskFunctionOperationsListener
= (
749 message
: MessageValue
<Response
>
751 this.checkMessageWorkerId(message
)
752 if (message
.taskFunctionOperationStatus
!= null) {
753 responsesReceived
.push(message
)
754 if (responsesReceived
.length
=== this.workerNodes
.length
) {
756 responsesReceived
.every(
757 message
=> message
.taskFunctionOperationStatus
=== true
762 responsesReceived
.some(
763 message
=> message
.taskFunctionOperationStatus
=== false
766 const errorResponse
= responsesReceived
.find(
767 response
=> response
.taskFunctionOperationStatus
=== false
771 `Task function operation '${
772 message.taskFunctionOperation as string
773 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
774 }' failed on worker ${errorResponse!
775 .workerId!} with error: '${
776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
777 errorResponse!.workerError!.message
782 this.deregisterWorkerMessageListener(
783 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
784 taskFunctionOperationsListener
789 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
790 this.registerWorkerMessageListener(
792 taskFunctionOperationsListener
794 this.sendToWorker(workerNodeKey
, message
)
800 public hasTaskFunction (name
: string): boolean {
801 for (const workerNode
of this.workerNodes
) {
803 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
804 workerNode
.info
.taskFunctionNames
.includes(name
)
813 public async addTaskFunction (
815 fn
: TaskFunction
<Data
, Response
>
816 ): Promise
<boolean> {
817 if (typeof name
!== 'string') {
818 throw new TypeError('name argument must be a string')
820 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
821 throw new TypeError('name argument must not be an empty string')
823 if (typeof fn
!== 'function') {
824 throw new TypeError('fn argument must be a function')
826 const opResult
= await this.sendTaskFunctionOperationToWorkers({
827 taskFunctionOperation
: 'add',
828 taskFunctionName
: name
,
829 taskFunction
: fn
.toString()
831 this.taskFunctions
.set(name
, fn
)
836 public async removeTaskFunction (name
: string): Promise
<boolean> {
837 if (!this.taskFunctions
.has(name
)) {
839 'Cannot remove a task function not handled on the pool side'
842 const opResult
= await this.sendTaskFunctionOperationToWorkers({
843 taskFunctionOperation
: 'remove',
844 taskFunctionName
: name
846 this.deleteTaskFunctionWorkerUsages(name
)
847 this.taskFunctions
.delete(name
)
852 public listTaskFunctionNames (): string[] {
853 for (const workerNode
of this.workerNodes
) {
855 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
856 workerNode
.info
.taskFunctionNames
.length
> 0
858 return workerNode
.info
.taskFunctionNames
865 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
866 return await this.sendTaskFunctionOperationToWorkers({
867 taskFunctionOperation
: 'default',
868 taskFunctionName
: name
872 private deleteTaskFunctionWorkerUsages (name
: string): void {
873 for (const workerNode
of this.workerNodes
) {
874 workerNode
.deleteTaskFunctionWorkerUsage(name
)
878 private shallExecuteTask (workerNodeKey
: number): boolean {
880 this.tasksQueueSize(workerNodeKey
) === 0 &&
881 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
882 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
883 this.opts
.tasksQueueOptions
!.concurrency
!
888 public async execute (
891 transferList
?: TransferListItem
[]
892 ): Promise
<Response
> {
893 return await new Promise
<Response
>((resolve
, reject
) => {
895 reject(new Error('Cannot execute a task on not started pool'))
898 if (this.destroying
) {
899 reject(new Error('Cannot execute a task on destroying pool'))
902 if (name
!= null && typeof name
!== 'string') {
903 reject(new TypeError('name argument must be a string'))
908 typeof name
=== 'string' &&
909 name
.trim().length
=== 0
911 reject(new TypeError('name argument must not be an empty string'))
914 if (transferList
!= null && !Array.isArray(transferList
)) {
915 reject(new TypeError('transferList argument must be an array'))
918 const timestamp
= performance
.now()
919 const workerNodeKey
= this.chooseWorkerNode()
920 const task
: Task
<Data
> = {
921 name
: name
?? DEFAULT_TASK_NAME
,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data
: data
?? ({} as Data
),
928 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
929 this.promiseResponseMap
.set(task
.taskId
!, {
933 ...(this.emitter
!= null && {
934 asyncResource
: new AsyncResource('poolifier:task', {
935 triggerAsyncId
: this.emitter
.asyncId
,
936 requireManualDestroy
: true
941 this.opts
.enableTasksQueue
=== false ||
942 (this.opts
.enableTasksQueue
=== true &&
943 this.shallExecuteTask(workerNodeKey
))
945 this.executeTask(workerNodeKey
, task
)
947 this.enqueueTask(workerNodeKey
, task
)
953 public start (): void {
955 throw new Error('Cannot start an already started pool')
958 throw new Error('Cannot start an already starting pool')
960 if (this.destroying
) {
961 throw new Error('Cannot start a destroying pool')
965 this.workerNodes
.reduce(
966 (accumulator
, workerNode
) =>
967 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
969 ) < this.minimumNumberOfWorkers
971 this.createAndSetupWorkerNode()
973 this.starting
= false
978 public async destroy (): Promise
<void> {
980 throw new Error('Cannot destroy an already destroyed pool')
983 throw new Error('Cannot destroy an starting pool')
985 if (this.destroying
) {
986 throw new Error('Cannot destroy an already destroying pool')
988 this.destroying
= true
990 this.workerNodes
.map(async (_workerNode
, workerNodeKey
) => {
991 await this.destroyWorkerNode(workerNodeKey
)
994 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
995 this.emitter
?.emitDestroy()
996 this.emitter
?.removeAllListeners()
997 this.readyEventEmitted
= false
998 this.destroying
= false
1002 private async sendKillMessageToWorker (workerNodeKey
: number): Promise
<void> {
1003 await new Promise
<void>((resolve
, reject
) => {
1004 if (this.workerNodes
?.[workerNodeKey
] == null) {
1008 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1009 this.checkMessageWorkerId(message
)
1010 if (message
.kill
=== 'success') {
1012 } else if (message
.kill
=== 'failure') {
1015 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1016 `Kill message handling failed on worker ${message.workerId!}`
1021 // FIXME: should be registered only once
1022 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1023 this.sendToWorker(workerNodeKey
, { kill
: true })
1028 * Terminates the worker node given its worker node key.
1030 * @param workerNodeKey - The worker node key.
1032 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
1033 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1034 const flushedTasks
= this.flushTasksQueue(workerNodeKey
)
1035 const workerNode
= this.workerNodes
[workerNodeKey
]
1036 await waitWorkerNodeEvents(
1040 this.opts
.tasksQueueOptions
?.tasksFinishedTimeout
??
1041 getDefaultTasksQueueOptions(
1042 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1043 ).tasksFinishedTimeout
1045 await this.sendKillMessageToWorker(workerNodeKey
)
1046 await workerNode
.terminate()
1050 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1051 * Can be overridden.
1055 protected setupHook (): void {
1056 /* Intentionally empty */
1060 * Should return whether the worker is the main worker or not.
1062 protected abstract isMain (): boolean
1065 * Hook executed before the worker task execution.
1066 * Can be overridden.
1068 * @param workerNodeKey - The worker node key.
1069 * @param task - The task to execute.
1071 protected beforeTaskExecutionHook (
1072 workerNodeKey
: number,
1075 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1076 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1077 ++workerUsage
.tasks
.executing
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext
,
1085 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1086 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1087 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(task
.name
!) !=
1090 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1091 const taskFunctionWorkerUsage
= this.workerNodes
[
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 ].getTaskFunctionWorkerUsage(task
.name
!)!
1095 ++taskFunctionWorkerUsage
.tasks
.executing
1096 updateWaitTimeWorkerUsage(
1097 this.workerChoiceStrategyContext
,
1098 taskFunctionWorkerUsage
,
1105 * Hook executed after the worker task execution.
1106 * Can be overridden.
1108 * @param workerNodeKey - The worker node key.
1109 * @param message - The received message.
1111 protected afterTaskExecutionHook (
1112 workerNodeKey
: number,
1113 message
: MessageValue
<Response
>
1115 let needWorkerChoiceStrategyUpdate
= false
1116 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1117 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1118 updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1119 updateRunTimeWorkerUsage(
1120 this.workerChoiceStrategyContext
,
1124 updateEluWorkerUsage(
1125 this.workerChoiceStrategyContext
,
1129 needWorkerChoiceStrategyUpdate
= true
1132 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1133 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1134 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1135 message
.taskPerformance
!.name
1138 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1139 const taskFunctionWorkerUsage
= this.workerNodes
[
1141 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1142 ].getTaskFunctionWorkerUsage(message
.taskPerformance
!.name
)!
1143 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1144 updateRunTimeWorkerUsage(
1145 this.workerChoiceStrategyContext
,
1146 taskFunctionWorkerUsage
,
1149 updateEluWorkerUsage(
1150 this.workerChoiceStrategyContext
,
1151 taskFunctionWorkerUsage
,
1154 needWorkerChoiceStrategyUpdate
= true
1156 if (needWorkerChoiceStrategyUpdate
) {
1157 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1162 * Whether the worker node shall update its task function worker usage or not.
1164 * @param workerNodeKey - The worker node key.
1165 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1167 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1168 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1170 workerInfo
!= null &&
1171 Array.isArray(workerInfo
.taskFunctionNames
) &&
1172 workerInfo
.taskFunctionNames
.length
> 2
1177 * Chooses a worker node for the next task.
1179 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1181 * @returns The chosen worker node key
1183 private chooseWorkerNode (): number {
1184 if (this.shallCreateDynamicWorker()) {
1185 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1187 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1189 return workerNodeKey
1192 return this.workerChoiceStrategyContext
.execute()
1196 * Conditions for dynamic worker creation.
1198 * @returns Whether to create a dynamic worker or not.
1200 protected abstract shallCreateDynamicWorker (): boolean
1203 * Sends a message to worker given its worker node key.
1205 * @param workerNodeKey - The worker node key.
1206 * @param message - The message.
1207 * @param transferList - The optional array of transferable objects.
1209 protected abstract sendToWorker (
1210 workerNodeKey
: number,
1211 message
: MessageValue
<Data
>,
1212 transferList
?: TransferListItem
[]
1216 * Creates a new, completely set up worker node.
1218 * @returns New, completely set up worker node key.
1220 protected createAndSetupWorkerNode (): number {
1221 const workerNode
= this.createWorkerNode()
1222 workerNode
.registerWorkerEventHandler(
1224 this.opts
.onlineHandler
?? EMPTY_FUNCTION
1226 workerNode
.registerWorkerEventHandler(
1228 this.opts
.messageHandler
?? EMPTY_FUNCTION
1230 workerNode
.registerWorkerEventHandler(
1232 this.opts
.errorHandler
?? EMPTY_FUNCTION
1234 workerNode
.registerWorkerEventHandler('error', (error
: Error) => {
1235 workerNode
.info
.ready
= false
1236 this.emitter
?.emit(PoolEvents
.error
, error
)
1240 this.opts
.restartWorkerOnError
=== true
1242 if (workerNode
.info
.dynamic
) {
1243 this.createAndSetupDynamicWorkerNode()
1245 this.createAndSetupWorkerNode()
1251 this.opts
.enableTasksQueue
=== true
1253 this.redistributeQueuedTasks(this.workerNodes
.indexOf(workerNode
))
1255 workerNode
?.terminate().catch(error
=> {
1256 this.emitter
?.emit(PoolEvents
.error
, error
)
1259 workerNode
.registerWorkerEventHandler(
1261 this.opts
.exitHandler
?? EMPTY_FUNCTION
1263 workerNode
.registerOnceWorkerEventHandler('exit', () => {
1264 this.removeWorkerNode(workerNode
)
1266 const workerNodeKey
= this.addWorkerNode(workerNode
)
1267 this.afterWorkerNodeSetup(workerNodeKey
)
1268 return workerNodeKey
1272 * Creates a new, completely set up dynamic worker node.
1274 * @returns New, completely set up dynamic worker node key.
1276 protected createAndSetupDynamicWorkerNode (): number {
1277 const workerNodeKey
= this.createAndSetupWorkerNode()
1278 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1279 this.checkMessageWorkerId(message
)
1280 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1283 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1284 // Kill message received from worker
1286 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1287 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1288 ((this.opts
.enableTasksQueue
=== false &&
1289 workerUsage
.tasks
.executing
=== 0) ||
1290 (this.opts
.enableTasksQueue
=== true &&
1291 workerUsage
.tasks
.executing
=== 0 &&
1292 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1294 // Flag the worker node as not ready immediately
1295 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1296 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1297 this.emitter
?.emit(PoolEvents
.error
, error
)
1301 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1302 this.sendToWorker(workerNodeKey
, {
1305 if (this.taskFunctions
.size
> 0) {
1306 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1307 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1308 taskFunctionOperation
: 'add',
1310 taskFunction
: taskFunction
.toString()
1312 this.emitter
?.emit(PoolEvents
.error
, error
)
1316 workerInfo
.dynamic
= true
1318 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1319 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1321 workerInfo
.ready
= true
1323 this.checkAndEmitDynamicWorkerCreationEvents()
1324 return workerNodeKey
1328 * Registers a listener callback on the worker given its worker node key.
1330 * @param workerNodeKey - The worker node key.
1331 * @param listener - The message listener callback.
1333 protected abstract registerWorkerMessageListener
<
1334 Message
extends Data
| Response
1336 workerNodeKey
: number,
1337 listener
: (message
: MessageValue
<Message
>) => void
1341 * Registers once a listener callback on the worker given its worker node key.
1343 * @param workerNodeKey - The worker node key.
1344 * @param listener - The message listener callback.
1346 protected abstract registerOnceWorkerMessageListener
<
1347 Message
extends Data
| Response
1349 workerNodeKey
: number,
1350 listener
: (message
: MessageValue
<Message
>) => void
1354 * Deregisters a listener callback on the worker given its worker node key.
1356 * @param workerNodeKey - The worker node key.
1357 * @param listener - The message listener callback.
1359 protected abstract deregisterWorkerMessageListener
<
1360 Message
extends Data
| Response
1362 workerNodeKey
: number,
1363 listener
: (message
: MessageValue
<Message
>) => void
1367 * Method hooked up after a worker node has been newly created.
1368 * Can be overridden.
1370 * @param workerNodeKey - The newly created worker node key.
1372 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1373 // Listen to worker messages.
1374 this.registerWorkerMessageListener(
1376 this.workerMessageListener
1378 // Send the startup message to worker.
1379 this.sendStartupMessageToWorker(workerNodeKey
)
1380 // Send the statistics message to worker.
1381 this.sendStatisticsMessageToWorker(workerNodeKey
)
1382 if (this.opts
.enableTasksQueue
=== true) {
1383 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1384 this.workerNodes
[workerNodeKey
].on(
1386 this.handleIdleWorkerNodeEvent
1389 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1390 this.workerNodes
[workerNodeKey
].on(
1392 this.handleBackPressureEvent
1399 * Sends the startup message to worker given its worker node key.
1401 * @param workerNodeKey - The worker node key.
1403 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1406 * Sends the statistics message to worker given its worker node key.
1408 * @param workerNodeKey - The worker node key.
1410 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1411 this.sendToWorker(workerNodeKey
, {
1414 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1416 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1422 private cannotStealTask (): boolean {
1423 return this.workerNodes
.length
<= 1 || this.info
.queuedTasks
=== 0
1426 private handleTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1427 if (this.shallExecuteTask(workerNodeKey
)) {
1428 this.executeTask(workerNodeKey
, task
)
1430 this.enqueueTask(workerNodeKey
, task
)
1434 private redistributeQueuedTasks (workerNodeKey
: number): void {
1435 if (workerNodeKey
=== -1 || this.cannotStealTask()) {
1438 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1439 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1440 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1441 return workerNode
.info
.ready
&&
1442 workerNode
.usage
.tasks
.queued
<
1443 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1450 destinationWorkerNodeKey
,
1451 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1452 this.dequeueTask(workerNodeKey
)!
1457 private updateTaskStolenStatisticsWorkerUsage (
1458 workerNodeKey
: number,
1461 const workerNode
= this.workerNodes
[workerNodeKey
]
1462 if (workerNode
?.usage
!= null) {
1463 ++workerNode
.usage
.tasks
.stolen
1466 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1467 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1469 const taskFunctionWorkerUsage
=
1470 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1471 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1472 ++taskFunctionWorkerUsage
.tasks
.stolen
1476 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1477 workerNodeKey
: number
1479 const workerNode
= this.workerNodes
[workerNodeKey
]
1480 if (workerNode
?.usage
!= null) {
1481 ++workerNode
.usage
.tasks
.sequentiallyStolen
1485 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1486 workerNodeKey
: number,
1489 const workerNode
= this.workerNodes
[workerNodeKey
]
1491 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1492 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1494 const taskFunctionWorkerUsage
=
1495 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1496 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1497 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1501 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1502 workerNodeKey
: number
1504 const workerNode
= this.workerNodes
[workerNodeKey
]
1505 if (workerNode
?.usage
!= null) {
1506 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1510 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1511 workerNodeKey
: number,
1514 const workerNode
= this.workerNodes
[workerNodeKey
]
1516 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1517 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1519 const taskFunctionWorkerUsage
=
1520 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1521 workerNode
.getTaskFunctionWorkerUsage(taskName
)!
1522 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1526 private readonly handleIdleWorkerNodeEvent
= (
1527 eventDetail
: WorkerNodeEventDetail
,
1528 previousStolenTask
?: Task
<Data
>
1530 const { workerNodeKey
} = eventDetail
1531 if (workerNodeKey
== null) {
1533 'WorkerNode event detail workerNodeKey property must be defined'
1537 this.cannotStealTask() ||
1538 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1539 this.info
.stealingWorkerNodes
! > Math.floor(this.workerNodes
.length
/ 2)
1541 if (previousStolenTask
!= null) {
1542 this.getWorkerInfo(workerNodeKey
).stealing
= false
1546 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1548 previousStolenTask
!= null &&
1549 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1550 (workerNodeTasksUsage
.executing
> 0 ||
1551 this.tasksQueueSize(workerNodeKey
) > 0)
1553 this.getWorkerInfo(workerNodeKey
).stealing
= false
1554 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1555 for (const taskName
of this.workerNodes
[workerNodeKey
].info
1556 .taskFunctionNames
!) {
1557 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1562 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1565 this.getWorkerInfo(workerNodeKey
).stealing
= true
1566 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1568 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1571 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1572 const taskFunctionTasksWorkerUsage
= this.workerNodes
[
1574 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1575 ].getTaskFunctionWorkerUsage(stolenTask
.name
!)!.tasks
1577 taskFunctionTasksWorkerUsage
.sequentiallyStolen
=== 0 ||
1578 (previousStolenTask
!= null &&
1579 previousStolenTask
.name
=== stolenTask
.name
&&
1580 taskFunctionTasksWorkerUsage
.sequentiallyStolen
> 0)
1582 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1584 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1588 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1595 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1597 this.handleIdleWorkerNodeEvent(eventDetail
, stolenTask
)
1600 .catch(EMPTY_FUNCTION
)
1603 private readonly workerNodeStealTask
= (
1604 workerNodeKey
: number
1605 ): Task
<Data
> | undefined => {
1606 const workerNodes
= this.workerNodes
1609 (workerNodeA
, workerNodeB
) =>
1610 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1612 const sourceWorkerNode
= workerNodes
.find(
1613 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1614 sourceWorkerNode
.info
.ready
&&
1615 !sourceWorkerNode
.info
.stealing
&&
1616 sourceWorkerNodeKey
!== workerNodeKey
&&
1617 sourceWorkerNode
.usage
.tasks
.queued
> 0
1619 if (sourceWorkerNode
!= null) {
1620 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1621 const task
= sourceWorkerNode
.popTask()!
1622 this.handleTask(workerNodeKey
, task
)
1623 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey
)
1624 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1625 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1630 private readonly handleBackPressureEvent
= (
1631 eventDetail
: WorkerNodeEventDetail
1634 this.cannotStealTask() ||
1635 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1636 this.info
.stealingWorkerNodes
! > Math.floor(this.workerNodes
.length
/ 2)
1640 const { workerId
} = eventDetail
1641 const sizeOffset
= 1
1642 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1643 if (this.opts
.tasksQueueOptions
!.size
! <= sizeOffset
) {
1646 const sourceWorkerNode
=
1647 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1648 const workerNodes
= this.workerNodes
1651 (workerNodeA
, workerNodeB
) =>
1652 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1654 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1656 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1657 workerNode
.info
.ready
&&
1658 !workerNode
.info
.stealing
&&
1659 workerNode
.info
.id
!== workerId
&&
1660 workerNode
.usage
.tasks
.queued
<
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 this.opts
.tasksQueueOptions
!.size
! - sizeOffset
1664 this.getWorkerInfo(workerNodeKey
).stealing
= true
1665 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1666 const task
= sourceWorkerNode
.popTask()!
1667 this.handleTask(workerNodeKey
, task
)
1668 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1669 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey
, task
.name
!)
1670 this.getWorkerInfo(workerNodeKey
).stealing
= false
1676 * This method is the message listener registered on each worker.
1678 protected readonly workerMessageListener
= (
1679 message
: MessageValue
<Response
>
1681 this.checkMessageWorkerId(message
)
1682 const { workerId
, ready
, taskId
, taskFunctionNames
} = message
1683 if (ready
!= null && taskFunctionNames
!= null) {
1684 // Worker ready response received from worker
1685 this.handleWorkerReadyResponse(message
)
1686 } else if (taskId
!= null) {
1687 // Task execution response received from worker
1688 this.handleTaskExecutionResponse(message
)
1689 } else if (taskFunctionNames
!= null) {
1690 // Task function names message received from worker
1692 this.getWorkerNodeKeyByWorkerId(workerId
)
1693 ).taskFunctionNames
= taskFunctionNames
1697 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1698 const { workerId
, ready
, taskFunctionNames
} = message
1699 if (ready
=== false) {
1700 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1701 throw new Error(`Worker ${workerId!} failed to initialize`)
1703 const workerInfo
= this.getWorkerInfo(
1704 this.getWorkerNodeKeyByWorkerId(workerId
)
1706 workerInfo
.ready
= ready
as boolean
1707 workerInfo
.taskFunctionNames
= taskFunctionNames
1708 if (!this.readyEventEmitted
&& this.ready
) {
1709 this.readyEventEmitted
= true
1710 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1714 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1715 const { workerId
, taskId
, workerError
, data
} = message
1716 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1717 const promiseResponse
= this.promiseResponseMap
.get(taskId
!)
1718 if (promiseResponse
!= null) {
1719 const { resolve
, reject
, workerNodeKey
, asyncResource
} = promiseResponse
1720 const workerNode
= this.workerNodes
[workerNodeKey
]
1721 if (workerError
!= null) {
1722 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1723 asyncResource
!= null
1724 ? asyncResource
.runInAsyncScope(
1729 : reject(workerError
.message
)
1731 asyncResource
!= null
1732 ? asyncResource
.runInAsyncScope(resolve
, this.emitter
, data
)
1733 : resolve(data
as Response
)
1735 asyncResource
?.emitDestroy()
1736 this.afterTaskExecutionHook(workerNodeKey
, message
)
1737 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1738 this.promiseResponseMap
.delete(taskId
!)
1739 workerNode
?.emit('taskFinished', taskId
)
1740 if (this.opts
.enableTasksQueue
=== true && !this.destroying
) {
1741 const workerNodeTasksUsage
= workerNode
.usage
.tasks
1743 this.tasksQueueSize(workerNodeKey
) > 0 &&
1744 workerNodeTasksUsage
.executing
<
1745 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1746 this.opts
.tasksQueueOptions
!.concurrency
!
1748 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1749 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1752 workerNodeTasksUsage
.executing
=== 0 &&
1753 this.tasksQueueSize(workerNodeKey
) === 0 &&
1754 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1756 workerNode
.emit('idleWorkerNode', {
1757 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1758 workerId
: workerId
!,
1766 private checkAndEmitTaskExecutionEvents (): void {
1768 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1772 private checkAndEmitTaskQueuingEvents (): void {
1773 if (this.hasBackPressure()) {
1774 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1779 * Emits dynamic worker creation events.
1781 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
1784 * Gets the worker information given its worker node key.
1786 * @param workerNodeKey - The worker node key.
1787 * @returns The worker information.
1789 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1790 return this.workerNodes
[workerNodeKey
]?.info
1794 * Creates a worker node.
1796 * @returns The created worker node.
1798 private createWorkerNode (): IWorkerNode
<Worker
, Data
> {
1799 const workerNode
= new WorkerNode
<Worker
, Data
>(
1804 workerOptions
: this.opts
.workerOptions
,
1805 tasksQueueBackPressureSize
:
1806 this.opts
.tasksQueueOptions
?.size
??
1807 getDefaultTasksQueueOptions(
1808 this.maximumNumberOfWorkers
?? this.minimumNumberOfWorkers
1812 // Flag the worker node as ready at pool startup.
1813 if (this.starting
) {
1814 workerNode
.info
.ready
= true
1820 * Adds the given worker node in the pool worker nodes.
1822 * @param workerNode - The worker node.
1823 * @returns The added worker node key.
1824 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1826 private addWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): number {
1827 this.workerNodes
.push(workerNode
)
1828 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1829 if (workerNodeKey
=== -1) {
1830 throw new Error('Worker added not found in worker nodes')
1832 return workerNodeKey
1836 * Removes the worker node from the pool worker nodes.
1838 * @param workerNode - The worker node.
1840 private removeWorkerNode (workerNode
: IWorkerNode
<Worker
, Data
>): void {
1841 const workerNodeKey
= this.workerNodes
.indexOf(workerNode
)
1842 if (workerNodeKey
!== -1) {
1843 this.workerNodes
.splice(workerNodeKey
, 1)
1844 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1848 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1849 this.getWorkerInfo(workerNodeKey
).ready
= false
1853 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1855 this.opts
.enableTasksQueue
=== true &&
1856 this.workerNodes
[workerNodeKey
].hasBackPressure()
1860 private hasBackPressure (): boolean {
1862 this.opts
.enableTasksQueue
=== true &&
1863 this.workerNodes
.findIndex(
1864 workerNode
=> !workerNode
.hasBackPressure()
1870 * Executes the given task on the worker given its worker node key.
1872 * @param workerNodeKey - The worker node key.
1873 * @param task - The task to execute.
1875 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1876 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1877 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1878 this.checkAndEmitTaskExecutionEvents()
1881 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1882 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1883 this.checkAndEmitTaskQueuingEvents()
1884 return tasksQueueSize
1887 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1888 return this.workerNodes
[workerNodeKey
].dequeueTask()
1891 private tasksQueueSize (workerNodeKey
: number): number {
1892 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1895 protected flushTasksQueue (workerNodeKey
: number): number {
1896 let flushedTasks
= 0
1897 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1898 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1899 this.executeTask(workerNodeKey
, this.dequeueTask(workerNodeKey
)!)
1902 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1906 private flushTasksQueues (): void {
1907 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1908 this.flushTasksQueue(workerNodeKey
)