1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
24 import { KillBehaviors
} from
'../worker/worker-options'
25 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
39 WorkerNodeEventDetail
,
44 type MeasurementStatisticsRequirements
,
46 WorkerChoiceStrategies
,
47 type WorkerChoiceStrategy
,
48 type WorkerChoiceStrategyOptions
49 } from
'./selection-strategies/selection-strategies-types'
50 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
51 import { version
} from
'./version'
52 import { WorkerNode
} from
'./worker-node'
55 checkValidTasksQueueOptions
,
56 checkValidWorkerChoiceStrategy
,
57 updateMeasurementStatistics
61 * Base class that implements some shared logic for all poolifier pools.
63 * @typeParam Worker - Type of worker which manages this pool.
64 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
65 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
67 export abstract class AbstractPool
<
68 Worker
extends IWorker
,
71 > implements IPool
<Worker
, Data
, Response
> {
73 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
76 public emitter
?: EventEmitterAsyncResource
79 * Dynamic pool maximum size property placeholder.
81 protected readonly max
?: number
84 * The task execution response promise map:
85 * - `key`: The message id of each submitted task.
86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
90 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
91 new Map
<string, PromiseResponseWrapper
<Response
>>()
94 * Worker choice strategy context referencing a worker choice algorithm implementation.
96 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
107 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
110 * Whether the pool is started or not.
112 private started
: boolean
114 * Whether the pool is starting or not.
116 private starting
: boolean
118 * Whether the pool is destroying or not.
120 private destroying
: boolean
122 * Whether the pool ready event has been emitted or not.
124 private readyEventEmitted
: boolean
126 * The start timestamp of the pool.
128 private readonly startTimestamp
131 * Constructs a new poolifier pool.
133 * @param numberOfWorkers - Number of workers that this pool should manage.
134 * @param filePath - Path to the worker file.
135 * @param opts - Options for the pool.
138 protected readonly numberOfWorkers
: number,
139 protected readonly filePath
: string,
140 protected readonly opts
: PoolOptions
<Worker
>
142 if (!this.isMain()) {
144 'Cannot start a pool from a worker with the same type as the pool'
147 checkFilePath(this.filePath
)
148 this.checkNumberOfWorkers(this.numberOfWorkers
)
149 this.checkPoolOptions(this.opts
)
151 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
152 this.executeTask
= this.executeTask
.bind(this)
153 this.enqueueTask
= this.enqueueTask
.bind(this)
155 if (this.opts
.enableEvents
=== true) {
156 this.initializeEventEmitter()
158 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
164 this.opts
.workerChoiceStrategy
,
165 this.opts
.workerChoiceStrategyOptions
170 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
173 this.starting
= false
174 this.destroying
= false
175 this.readyEventEmitted
= false
176 if (this.opts
.startWorkers
=== true) {
180 this.startTimestamp
= performance
.now()
183 private checkNumberOfWorkers (numberOfWorkers
: number): void {
184 if (numberOfWorkers
== null) {
186 'Cannot instantiate a pool without specifying the number of workers'
188 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
190 'Cannot instantiate a pool with a non safe integer number of workers'
192 } else if (numberOfWorkers
< 0) {
193 throw new RangeError(
194 'Cannot instantiate a pool with a negative number of workers'
196 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
197 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
201 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
202 if (isPlainObject(opts
)) {
203 this.opts
.startWorkers
= opts
.startWorkers
?? true
204 checkValidWorkerChoiceStrategy(
205 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
207 this.opts
.workerChoiceStrategy
=
208 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategyOptions(
210 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
212 this.opts
.workerChoiceStrategyOptions
= {
213 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
214 ...opts
.workerChoiceStrategyOptions
216 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
217 this.opts
.enableEvents
= opts
.enableEvents
?? true
218 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
219 if (this.opts
.enableTasksQueue
) {
220 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
221 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
222 opts
.tasksQueueOptions
as TasksQueueOptions
226 throw new TypeError('Invalid pool options: must be a plain object')
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
234 workerChoiceStrategyOptions
!= null &&
235 !isPlainObject(workerChoiceStrategyOptions
)
238 'Invalid worker choice strategy options: must be a plain object'
242 workerChoiceStrategyOptions
?.retries
!= null &&
243 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
246 'Invalid worker choice strategy options: retries must be an integer'
250 workerChoiceStrategyOptions
?.retries
!= null &&
251 workerChoiceStrategyOptions
.retries
< 0
253 throw new RangeError(
254 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
258 workerChoiceStrategyOptions
?.weights
!= null &&
259 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
262 'Invalid worker choice strategy options: must have a weight for each worker node'
266 workerChoiceStrategyOptions
?.measurement
!= null &&
267 !Object.values(Measurements
).includes(
268 workerChoiceStrategyOptions
.measurement
272 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
277 private initializeEventEmitter (): void {
278 this.emitter
= new EventEmitterAsyncResource({
279 name
: `poolifier:${this.type}-${this.worker}-pool`
284 public get
info (): PoolInfo
{
289 started
: this.started
,
291 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
292 minSize
: this.minSize
,
293 maxSize
: this.maxSize
,
294 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
295 .runTime
.aggregate
&&
296 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
297 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
298 workerNodes
: this.workerNodes
.length
,
299 idleWorkerNodes
: this.workerNodes
.reduce(
300 (accumulator
, workerNode
) =>
301 workerNode
.usage
.tasks
.executing
=== 0
306 busyWorkerNodes
: this.workerNodes
.reduce(
307 (accumulator
, workerNode
) =>
308 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
311 executedTasks
: this.workerNodes
.reduce(
312 (accumulator
, workerNode
) =>
313 accumulator
+ workerNode
.usage
.tasks
.executed
,
316 executingTasks
: this.workerNodes
.reduce(
317 (accumulator
, workerNode
) =>
318 accumulator
+ workerNode
.usage
.tasks
.executing
,
321 ...(this.opts
.enableTasksQueue
=== true && {
322 queuedTasks
: this.workerNodes
.reduce(
323 (accumulator
, workerNode
) =>
324 accumulator
+ workerNode
.usage
.tasks
.queued
,
328 ...(this.opts
.enableTasksQueue
=== true && {
329 maxQueuedTasks
: this.workerNodes
.reduce(
330 (accumulator
, workerNode
) =>
331 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
335 ...(this.opts
.enableTasksQueue
=== true && {
336 backPressure
: this.hasBackPressure()
338 ...(this.opts
.enableTasksQueue
=== true && {
339 stolenTasks
: this.workerNodes
.reduce(
340 (accumulator
, workerNode
) =>
341 accumulator
+ workerNode
.usage
.tasks
.stolen
,
345 failedTasks
: this.workerNodes
.reduce(
346 (accumulator
, workerNode
) =>
347 accumulator
+ workerNode
.usage
.tasks
.failed
,
350 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
351 .runTime
.aggregate
&& {
355 ...this.workerNodes
.map(
356 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
362 ...this.workerNodes
.map(
363 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
367 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
368 .runTime
.average
&& {
371 this.workerNodes
.reduce
<number[]>(
372 (accumulator
, workerNode
) =>
373 accumulator
.concat(workerNode
.usage
.runTime
.history
),
379 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
383 this.workerNodes
.reduce
<number[]>(
384 (accumulator
, workerNode
) =>
385 accumulator
.concat(workerNode
.usage
.runTime
.history
),
393 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
394 .waitTime
.aggregate
&& {
398 ...this.workerNodes
.map(
399 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
405 ...this.workerNodes
.map(
406 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
410 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
411 .waitTime
.average
&& {
414 this.workerNodes
.reduce
<number[]>(
415 (accumulator
, workerNode
) =>
416 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
422 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
423 .waitTime
.median
&& {
426 this.workerNodes
.reduce
<number[]>(
427 (accumulator
, workerNode
) =>
428 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
440 * The pool readiness boolean status.
442 private get
ready (): boolean {
444 this.workerNodes
.reduce(
445 (accumulator
, workerNode
) =>
446 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
455 * The approximate pool utilization.
457 * @returns The pool utilization.
459 private get
utilization (): number {
460 const poolTimeCapacity
=
461 (performance
.now() - this.startTimestamp
) * this.maxSize
462 const totalTasksRunTime
= this.workerNodes
.reduce(
463 (accumulator
, workerNode
) =>
464 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
467 const totalTasksWaitTime
= this.workerNodes
.reduce(
468 (accumulator
, workerNode
) =>
469 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
472 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
478 * If it is `'dynamic'`, it provides the `max` property.
480 protected abstract get
type (): PoolType
485 protected abstract get
worker (): WorkerType
488 * The pool minimum size.
490 protected get
minSize (): number {
491 return this.numberOfWorkers
495 * The pool maximum size.
497 protected get
maxSize (): number {
498 return this.max
?? this.numberOfWorkers
502 * Checks if the worker id sent in the received message from a worker is valid.
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
508 if (message
.workerId
== null) {
509 throw new Error('Worker message received without worker id')
510 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
512 `Worker message received from unknown worker '${message.workerId}'`
518 * Gets the given worker its worker node key.
520 * @param worker - The worker.
521 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
523 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
524 return this.workerNodes
.findIndex(
525 workerNode
=> workerNode
.worker
=== worker
530 * Gets the worker node key given its worker id.
532 * @param workerId - The worker id.
533 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
535 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
536 return this.workerNodes
.findIndex(
537 workerNode
=> workerNode
.info
.id
=== workerId
542 public setWorkerChoiceStrategy (
543 workerChoiceStrategy
: WorkerChoiceStrategy
,
544 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
546 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
547 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
548 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
549 this.opts
.workerChoiceStrategy
551 if (workerChoiceStrategyOptions
!= null) {
552 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
554 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
555 workerNode
.resetUsage()
556 this.sendStatisticsMessageToWorker(workerNodeKey
)
561 public setWorkerChoiceStrategyOptions (
562 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
564 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
565 this.opts
.workerChoiceStrategyOptions
= {
566 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
567 ...workerChoiceStrategyOptions
569 this.workerChoiceStrategyContext
.setOptions(
570 this.opts
.workerChoiceStrategyOptions
575 public enableTasksQueue (
577 tasksQueueOptions
?: TasksQueueOptions
579 if (this.opts
.enableTasksQueue
=== true && !enable
) {
580 this.unsetTaskStealing()
581 this.unsetTasksStealingOnBackPressure()
582 this.flushTasksQueues()
584 this.opts
.enableTasksQueue
= enable
585 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
589 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
590 if (this.opts
.enableTasksQueue
=== true) {
591 checkValidTasksQueueOptions(tasksQueueOptions
)
592 this.opts
.tasksQueueOptions
=
593 this.buildTasksQueueOptions(tasksQueueOptions
)
594 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
595 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
596 this.setTaskStealing()
598 this.unsetTaskStealing()
600 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
601 this.setTasksStealingOnBackPressure()
603 this.unsetTasksStealingOnBackPressure()
605 } else if (this.opts
.tasksQueueOptions
!= null) {
606 delete this.opts
.tasksQueueOptions
610 private buildTasksQueueOptions (
611 tasksQueueOptions
: TasksQueueOptions
612 ): TasksQueueOptions
{
615 size
: Math.pow(this.maxSize
, 2),
618 tasksStealingOnBackPressure
: true
624 private setTasksQueueSize (size
: number): void {
625 for (const workerNode
of this.workerNodes
) {
626 workerNode
.tasksQueueBackPressureSize
= size
630 private setTaskStealing (): void {
631 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
632 this.workerNodes
[workerNodeKey
].addEventListener(
634 this.handleIdleWorkerNodeEvent
as EventListener
639 private unsetTaskStealing (): void {
640 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
641 this.workerNodes
[workerNodeKey
].removeEventListener(
643 this.handleIdleWorkerNodeEvent
as EventListener
648 private setTasksStealingOnBackPressure (): void {
649 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
650 this.workerNodes
[workerNodeKey
].addEventListener(
652 this.handleBackPressureEvent
as EventListener
657 private unsetTasksStealingOnBackPressure (): void {
658 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
659 this.workerNodes
[workerNodeKey
].removeEventListener(
661 this.handleBackPressureEvent
as EventListener
667 * Whether the pool is full or not.
669 * The pool filling boolean status.
671 protected get
full (): boolean {
672 return this.workerNodes
.length
>= this.maxSize
676 * Whether the pool is busy or not.
678 * The pool busyness boolean status.
680 protected abstract get
busy (): boolean
683 * Whether worker nodes are executing concurrently their tasks quota or not.
685 * @returns Worker nodes busyness boolean status.
687 protected internalBusy (): boolean {
688 if (this.opts
.enableTasksQueue
=== true) {
690 this.workerNodes
.findIndex(
692 workerNode
.info
.ready
&&
693 workerNode
.usage
.tasks
.executing
<
694 (this.opts
.tasksQueueOptions
?.concurrency
as number)
699 this.workerNodes
.findIndex(
701 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
706 private async sendTaskFunctionOperationToWorker (
707 workerNodeKey
: number,
708 message
: MessageValue
<Data
>
709 ): Promise
<boolean> {
710 return await new Promise
<boolean>((resolve
, reject
) => {
711 const taskFunctionOperationListener
= (
712 message
: MessageValue
<Response
>
714 this.checkMessageWorkerId(message
)
715 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
717 message
.taskFunctionOperationStatus
!= null &&
718 message
.workerId
=== workerId
720 if (message
.taskFunctionOperationStatus
) {
722 } else if (!message
.taskFunctionOperationStatus
) {
725 `Task function operation '${
726 message.taskFunctionOperation as string
727 }' failed on worker ${message.workerId} with error: '${
728 message.workerError?.message as string
733 this.deregisterWorkerMessageListener(
734 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
735 taskFunctionOperationListener
739 this.registerWorkerMessageListener(
741 taskFunctionOperationListener
743 this.sendToWorker(workerNodeKey
, message
)
747 private async sendTaskFunctionOperationToWorkers (
748 message
: MessageValue
<Data
>
749 ): Promise
<boolean> {
750 return await new Promise
<boolean>((resolve
, reject
) => {
751 const responsesReceived
= new Array<MessageValue
<Response
>>()
752 const taskFunctionOperationsListener
= (
753 message
: MessageValue
<Response
>
755 this.checkMessageWorkerId(message
)
756 if (message
.taskFunctionOperationStatus
!= null) {
757 responsesReceived
.push(message
)
758 if (responsesReceived
.length
=== this.workerNodes
.length
) {
760 responsesReceived
.every(
761 message
=> message
.taskFunctionOperationStatus
=== true
766 responsesReceived
.some(
767 message
=> message
.taskFunctionOperationStatus
=== false
770 const errorResponse
= responsesReceived
.find(
771 response
=> response
.taskFunctionOperationStatus
=== false
775 `Task function operation '${
776 message.taskFunctionOperation as string
777 }' failed on worker ${
778 errorResponse?.workerId as number
780 errorResponse?.workerError?.message as string
785 this.deregisterWorkerMessageListener(
786 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
787 taskFunctionOperationsListener
792 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
793 this.registerWorkerMessageListener(
795 taskFunctionOperationsListener
797 this.sendToWorker(workerNodeKey
, message
)
803 public hasTaskFunction (name
: string): boolean {
804 for (const workerNode
of this.workerNodes
) {
806 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
807 workerNode
.info
.taskFunctionNames
.includes(name
)
816 public async addTaskFunction (
818 fn
: TaskFunction
<Data
, Response
>
819 ): Promise
<boolean> {
820 if (typeof name
!== 'string') {
821 throw new TypeError('name argument must be a string')
823 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
824 throw new TypeError('name argument must not be an empty string')
826 if (typeof fn
!== 'function') {
827 throw new TypeError('fn argument must be a function')
829 const opResult
= await this.sendTaskFunctionOperationToWorkers({
830 taskFunctionOperation
: 'add',
831 taskFunctionName
: name
,
832 taskFunction
: fn
.toString()
834 this.taskFunctions
.set(name
, fn
)
839 public async removeTaskFunction (name
: string): Promise
<boolean> {
840 if (!this.taskFunctions
.has(name
)) {
842 'Cannot remove a task function not handled on the pool side'
845 const opResult
= await this.sendTaskFunctionOperationToWorkers({
846 taskFunctionOperation
: 'remove',
847 taskFunctionName
: name
849 this.deleteTaskFunctionWorkerUsages(name
)
850 this.taskFunctions
.delete(name
)
855 public listTaskFunctionNames (): string[] {
856 for (const workerNode
of this.workerNodes
) {
858 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
859 workerNode
.info
.taskFunctionNames
.length
> 0
861 return workerNode
.info
.taskFunctionNames
868 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
869 return await this.sendTaskFunctionOperationToWorkers({
870 taskFunctionOperation
: 'default',
871 taskFunctionName
: name
875 private deleteTaskFunctionWorkerUsages (name
: string): void {
876 for (const workerNode
of this.workerNodes
) {
877 workerNode
.deleteTaskFunctionWorkerUsage(name
)
881 private shallExecuteTask (workerNodeKey
: number): boolean {
883 this.tasksQueueSize(workerNodeKey
) === 0 &&
884 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
885 (this.opts
.tasksQueueOptions
?.concurrency
as number)
890 public async execute (
893 transferList
?: TransferListItem
[]
894 ): Promise
<Response
> {
895 return await new Promise
<Response
>((resolve
, reject
) => {
897 reject(new Error('Cannot execute a task on not started pool'))
900 if (this.destroying
) {
901 reject(new Error('Cannot execute a task on destroying pool'))
904 if (name
!= null && typeof name
!== 'string') {
905 reject(new TypeError('name argument must be a string'))
910 typeof name
=== 'string' &&
911 name
.trim().length
=== 0
913 reject(new TypeError('name argument must not be an empty string'))
916 if (transferList
!= null && !Array.isArray(transferList
)) {
917 reject(new TypeError('transferList argument must be an array'))
920 const timestamp
= performance
.now()
921 const workerNodeKey
= this.chooseWorkerNode()
922 const task
: Task
<Data
> = {
923 name
: name
?? DEFAULT_TASK_NAME
,
924 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
925 data
: data
?? ({} as Data
),
930 this.promiseResponseMap
.set(task
.taskId
as string, {
936 this.opts
.enableTasksQueue
=== false ||
937 (this.opts
.enableTasksQueue
=== true &&
938 this.shallExecuteTask(workerNodeKey
))
940 this.executeTask(workerNodeKey
, task
)
942 this.enqueueTask(workerNodeKey
, task
)
948 public start (): void {
950 throw new Error('Cannot start an already started pool')
953 throw new Error('Cannot start an already starting pool')
955 if (this.destroying
) {
956 throw new Error('Cannot start a destroying pool')
960 this.workerNodes
.reduce(
961 (accumulator
, workerNode
) =>
962 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
964 ) < this.numberOfWorkers
966 this.createAndSetupWorkerNode()
968 this.starting
= false
973 public async destroy (): Promise
<void> {
975 throw new Error('Cannot destroy an already destroyed pool')
978 throw new Error('Cannot destroy an starting pool')
980 if (this.destroying
) {
981 throw new Error('Cannot destroy an already destroying pool')
983 this.destroying
= true
985 this.workerNodes
.map(async (_
, workerNodeKey
) => {
986 await this.destroyWorkerNode(workerNodeKey
)
989 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
990 this.emitter
?.emitDestroy()
991 this.readyEventEmitted
= false
992 this.destroying
= false
996 protected async sendKillMessageToWorker (
997 workerNodeKey
: number
999 await new Promise
<void>((resolve
, reject
) => {
1000 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
1001 this.checkMessageWorkerId(message
)
1002 if (message
.kill
=== 'success') {
1004 } else if (message
.kill
=== 'failure') {
1007 `Kill message handling failed on worker ${
1008 message.workerId as number
1014 // FIXME: should be registered only once
1015 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1016 this.sendToWorker(workerNodeKey
, { kill
: true })
1021 * Terminates the worker node given its worker node key.
1023 * @param workerNodeKey - The worker node key.
1025 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1028 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1029 * Can be overridden.
1033 protected setupHook (): void {
1034 /* Intentionally empty */
1038 * Should return whether the worker is the main worker or not.
1040 protected abstract isMain (): boolean
1043 * Hook executed before the worker task execution.
1044 * Can be overridden.
1046 * @param workerNodeKey - The worker node key.
1047 * @param task - The task to execute.
1049 protected beforeTaskExecutionHook (
1050 workerNodeKey
: number,
1053 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1054 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1055 ++workerUsage
.tasks
.executing
1056 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1059 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1060 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1064 const taskFunctionWorkerUsage
= this.workerNodes
[
1066 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1067 ++taskFunctionWorkerUsage
.tasks
.executing
1068 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1073 * Hook executed after the worker task execution.
1074 * Can be overridden.
1076 * @param workerNodeKey - The worker node key.
1077 * @param message - The received message.
1079 protected afterTaskExecutionHook (
1080 workerNodeKey
: number,
1081 message
: MessageValue
<Response
>
1083 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1084 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1085 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1086 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1087 this.updateEluWorkerUsage(workerUsage
, message
)
1090 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1091 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1092 message
.taskPerformance
?.name
as string
1095 const taskFunctionWorkerUsage
= this.workerNodes
[
1097 ].getTaskFunctionWorkerUsage(
1098 message
.taskPerformance
?.name
as string
1100 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1101 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1102 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1107 * Whether the worker node shall update its task function worker usage or not.
1109 * @param workerNodeKey - The worker node key.
1110 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1112 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1113 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1115 workerInfo
!= null &&
1116 Array.isArray(workerInfo
.taskFunctionNames
) &&
1117 workerInfo
.taskFunctionNames
.length
> 2
1121 private updateTaskStatisticsWorkerUsage (
1122 workerUsage
: WorkerUsage
,
1123 message
: MessageValue
<Response
>
1125 const workerTaskStatistics
= workerUsage
.tasks
1127 workerTaskStatistics
.executing
!= null &&
1128 workerTaskStatistics
.executing
> 0
1130 --workerTaskStatistics
.executing
1132 if (message
.workerError
== null) {
1133 ++workerTaskStatistics
.executed
1135 ++workerTaskStatistics
.failed
1139 private updateRunTimeWorkerUsage (
1140 workerUsage
: WorkerUsage
,
1141 message
: MessageValue
<Response
>
1143 if (message
.workerError
!= null) {
1146 updateMeasurementStatistics(
1147 workerUsage
.runTime
,
1148 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1149 message
.taskPerformance
?.runTime
?? 0
1153 private updateWaitTimeWorkerUsage (
1154 workerUsage
: WorkerUsage
,
1157 const timestamp
= performance
.now()
1158 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1159 updateMeasurementStatistics(
1160 workerUsage
.waitTime
,
1161 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1166 private updateEluWorkerUsage (
1167 workerUsage
: WorkerUsage
,
1168 message
: MessageValue
<Response
>
1170 if (message
.workerError
!= null) {
1173 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1174 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1175 updateMeasurementStatistics(
1176 workerUsage
.elu
.active
,
1177 eluTaskStatisticsRequirements
,
1178 message
.taskPerformance
?.elu
?.active
?? 0
1180 updateMeasurementStatistics(
1181 workerUsage
.elu
.idle
,
1182 eluTaskStatisticsRequirements
,
1183 message
.taskPerformance
?.elu
?.idle
?? 0
1185 if (eluTaskStatisticsRequirements
.aggregate
) {
1186 if (message
.taskPerformance
?.elu
!= null) {
1187 if (workerUsage
.elu
.utilization
!= null) {
1188 workerUsage
.elu
.utilization
=
1189 (workerUsage
.elu
.utilization
+
1190 message
.taskPerformance
.elu
.utilization
) /
1193 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1200 * Chooses a worker node for the next task.
1202 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1204 * @returns The chosen worker node key
1206 private chooseWorkerNode (): number {
1207 if (this.shallCreateDynamicWorker()) {
1208 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1210 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1212 return workerNodeKey
1215 return this.workerChoiceStrategyContext
.execute()
1219 * Conditions for dynamic worker creation.
1221 * @returns Whether to create a dynamic worker or not.
1223 private shallCreateDynamicWorker (): boolean {
1224 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1228 * Sends a message to worker given its worker node key.
1230 * @param workerNodeKey - The worker node key.
1231 * @param message - The message.
1232 * @param transferList - The optional array of transferable objects.
1234 protected abstract sendToWorker (
1235 workerNodeKey
: number,
1236 message
: MessageValue
<Data
>,
1237 transferList
?: TransferListItem
[]
1241 * Creates a new worker.
1243 * @returns Newly created worker.
1245 protected abstract createWorker (): Worker
1248 * Creates a new, completely set up worker node.
1250 * @returns New, completely set up worker node key.
1252 protected createAndSetupWorkerNode (): number {
1253 const worker
= this.createWorker()
1255 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1256 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1257 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1258 worker
.on('error', error
=> {
1259 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1260 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1261 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1262 this.emitter
?.emit(PoolEvents
.error
, error
)
1263 this.workerNodes
[workerNodeKey
].closeChannel()
1268 this.opts
.restartWorkerOnError
=== true
1270 if (workerInfo
.dynamic
) {
1271 this.createAndSetupDynamicWorkerNode()
1273 this.createAndSetupWorkerNode()
1276 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1277 this.redistributeQueuedTasks(workerNodeKey
)
1280 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1281 worker
.once('exit', () => {
1282 this.removeWorkerNode(worker
)
1285 const workerNodeKey
= this.addWorkerNode(worker
)
1287 this.afterWorkerNodeSetup(workerNodeKey
)
1289 return workerNodeKey
1293 * Creates a new, completely set up dynamic worker node.
1295 * @returns New, completely set up dynamic worker node key.
1297 protected createAndSetupDynamicWorkerNode (): number {
1298 const workerNodeKey
= this.createAndSetupWorkerNode()
1299 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1300 this.checkMessageWorkerId(message
)
1301 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1304 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1305 // Kill message received from worker
1307 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1308 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1309 ((this.opts
.enableTasksQueue
=== false &&
1310 workerUsage
.tasks
.executing
=== 0) ||
1311 (this.opts
.enableTasksQueue
=== true &&
1312 workerUsage
.tasks
.executing
=== 0 &&
1313 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1315 // Flag the worker node as not ready immediately
1316 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1317 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1318 this.emitter
?.emit(PoolEvents
.error
, error
)
1322 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1323 this.sendToWorker(workerNodeKey
, {
1326 if (this.taskFunctions
.size
> 0) {
1327 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1328 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1329 taskFunctionOperation
: 'add',
1331 taskFunction
: taskFunction
.toString()
1333 this.emitter
?.emit(PoolEvents
.error
, error
)
1337 workerInfo
.dynamic
= true
1339 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1340 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1342 workerInfo
.ready
= true
1344 this.checkAndEmitDynamicWorkerCreationEvents()
1345 return workerNodeKey
1349 * Registers a listener callback on the worker given its worker node key.
1351 * @param workerNodeKey - The worker node key.
1352 * @param listener - The message listener callback.
1354 protected abstract registerWorkerMessageListener
<
1355 Message
extends Data
| Response
1357 workerNodeKey
: number,
1358 listener
: (message
: MessageValue
<Message
>) => void
1362 * Registers once a listener callback on the worker given its worker node key.
1364 * @param workerNodeKey - The worker node key.
1365 * @param listener - The message listener callback.
1367 protected abstract registerOnceWorkerMessageListener
<
1368 Message
extends Data
| Response
1370 workerNodeKey
: number,
1371 listener
: (message
: MessageValue
<Message
>) => void
1375 * Deregisters a listener callback on the worker given its worker node key.
1377 * @param workerNodeKey - The worker node key.
1378 * @param listener - The message listener callback.
1380 protected abstract deregisterWorkerMessageListener
<
1381 Message
extends Data
| Response
1383 workerNodeKey
: number,
1384 listener
: (message
: MessageValue
<Message
>) => void
1388 * Method hooked up after a worker node has been newly created.
1389 * Can be overridden.
1391 * @param workerNodeKey - The newly created worker node key.
1393 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1394 // Listen to worker messages.
1395 this.registerWorkerMessageListener(
1397 this.workerMessageListener
.bind(this)
1399 // Send the startup message to worker.
1400 this.sendStartupMessageToWorker(workerNodeKey
)
1401 // Send the statistics message to worker.
1402 this.sendStatisticsMessageToWorker(workerNodeKey
)
1403 if (this.opts
.enableTasksQueue
=== true) {
1404 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1405 this.workerNodes
[workerNodeKey
].addEventListener(
1407 this.handleIdleWorkerNodeEvent
as EventListener
1410 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1411 this.workerNodes
[workerNodeKey
].addEventListener(
1413 this.handleBackPressureEvent
as EventListener
1420 * Sends the startup message to worker given its worker node key.
1422 * @param workerNodeKey - The worker node key.
1424 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1427 * Sends the statistics message to worker given its worker node key.
1429 * @param workerNodeKey - The worker node key.
1431 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1432 this.sendToWorker(workerNodeKey
, {
1435 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1437 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1443 private redistributeQueuedTasks (workerNodeKey
: number): void {
1444 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1445 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1446 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1447 return workerNode
.info
.ready
&&
1448 workerNode
.usage
.tasks
.queued
<
1449 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1455 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1456 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1457 this.executeTask(destinationWorkerNodeKey
, task
)
1459 this.enqueueTask(destinationWorkerNodeKey
, task
)
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey
: number,
1468 const workerNode
= this.workerNodes
[workerNodeKey
]
1469 if (workerNode
?.usage
!= null) {
1470 ++workerNode
.usage
.tasks
.stolen
1473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1474 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1476 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1479 ++taskFunctionWorkerUsage
.tasks
.stolen
1483 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1484 workerNodeKey
: number,
1487 const workerNode
= this.workerNodes
[workerNodeKey
]
1488 if (workerNode
?.usage
!= null) {
1489 ++workerNode
.usage
.tasks
.sequentiallyStolen
1492 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1493 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1495 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1498 ++taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
1502 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1503 workerNodeKey
: number,
1506 const workerNode
= this.workerNodes
[workerNodeKey
]
1507 if (workerNode
?.usage
!= null) {
1508 workerNode
.usage
.tasks
.sequentiallyStolen
= 0
1511 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1512 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1514 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1517 taskFunctionWorkerUsage
.tasks
.sequentiallyStolen
= 0
1521 private readonly handleIdleWorkerNodeEvent
= (
1522 event
: CustomEvent
<WorkerNodeEventDetail
>,
1523 previousStolenTask
?: Task
<Data
>
1525 const { workerNodeKey
} = event
.detail
1526 if (workerNodeKey
== null) {
1528 'WorkerNode event detail workerNodeKey attribute must be defined'
1531 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1533 previousStolenTask
!= null &&
1534 workerNodeTasksUsage
.sequentiallyStolen
> 0 &&
1535 (workerNodeTasksUsage
.executing
> 0 ||
1536 this.tasksQueueSize(workerNodeKey
) > 0)
1538 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
1540 previousStolenTask
.name
as string
1544 const stolenTask
= this.workerNodeStealTask(workerNodeKey
)
1545 sleep(exponentialDelay(workerNodeTasksUsage
.sequentiallyStolen
))
1547 this.handleIdleWorkerNodeEvent(event
, stolenTask
)
1550 .catch(EMPTY_FUNCTION
)
1553 private readonly workerNodeStealTask
= (
1554 workerNodeKey
: number
1555 ): Task
<Data
> | undefined => {
1556 const workerNodes
= this.workerNodes
1559 (workerNodeA
, workerNodeB
) =>
1560 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1562 const sourceWorkerNode
= workerNodes
.find(
1563 (sourceWorkerNode
, sourceWorkerNodeKey
) =>
1564 sourceWorkerNode
.info
.ready
&&
1565 sourceWorkerNodeKey
!== workerNodeKey
&&
1566 sourceWorkerNode
.usage
.tasks
.queued
> 0
1568 if (sourceWorkerNode
!= null) {
1569 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1570 if (this.shallExecuteTask(workerNodeKey
)) {
1571 this.executeTask(workerNodeKey
, task
)
1573 this.enqueueTask(workerNodeKey
, task
)
1575 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
1579 this.updateTaskStolenStatisticsWorkerUsage(
1587 private readonly handleBackPressureEvent
= (
1588 event
: CustomEvent
<WorkerNodeEventDetail
>
1590 const { workerId
} = event
.detail
1591 const sizeOffset
= 1
1592 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1595 const sourceWorkerNode
=
1596 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1597 const workerNodes
= this.workerNodes
1600 (workerNodeA
, workerNodeB
) =>
1601 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1603 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1605 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1606 workerNode
.info
.ready
&&
1607 workerNode
.info
.id
!== workerId
&&
1608 workerNode
.usage
.tasks
.queued
<
1609 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1611 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1612 if (this.shallExecuteTask(workerNodeKey
)) {
1613 this.executeTask(workerNodeKey
, task
)
1615 this.enqueueTask(workerNodeKey
, task
)
1617 this.updateTaskStolenStatisticsWorkerUsage(
1626 * This method is the message listener registered on each worker.
1628 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1629 this.checkMessageWorkerId(message
)
1630 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1631 // Worker ready response received from worker
1632 this.handleWorkerReadyResponse(message
)
1633 } else if (message
.taskId
!= null) {
1634 // Task execution response received from worker
1635 this.handleTaskExecutionResponse(message
)
1636 } else if (message
.taskFunctionNames
!= null) {
1637 // Task function names message received from worker
1639 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1640 ).taskFunctionNames
= message
.taskFunctionNames
1644 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1645 const { workerId
, ready
, taskFunctionNames
} = message
1646 if (ready
=== false) {
1647 throw new Error(`Worker ${workerId as number} failed to initialize`)
1649 const workerInfo
= this.getWorkerInfo(
1650 this.getWorkerNodeKeyByWorkerId(workerId
)
1652 workerInfo
.ready
= ready
as boolean
1653 workerInfo
.taskFunctionNames
= taskFunctionNames
1654 if (!this.readyEventEmitted
&& this.ready
) {
1655 this.readyEventEmitted
= true
1656 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1660 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1661 const { workerId
, taskId
, workerError
, data
} = message
1662 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1663 if (promiseResponse
!= null) {
1664 const { resolve
, reject
, workerNodeKey
} = promiseResponse
1665 if (workerError
!= null) {
1666 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1667 reject(workerError
.message
)
1669 resolve(data
as Response
)
1671 this.afterTaskExecutionHook(workerNodeKey
, message
)
1672 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1673 this.promiseResponseMap
.delete(taskId
as string)
1674 if (this.opts
.enableTasksQueue
=== true) {
1675 const workerNodeTasksUsage
= this.workerNodes
[workerNodeKey
].usage
.tasks
1677 this.tasksQueueSize(workerNodeKey
) > 0 &&
1678 workerNodeTasksUsage
.executing
<
1679 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1683 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1687 workerNodeTasksUsage
.executing
=== 0 &&
1688 this.tasksQueueSize(workerNodeKey
) === 0 &&
1689 workerNodeTasksUsage
.sequentiallyStolen
=== 0
1691 this.workerNodes
[workerNodeKey
].dispatchEvent(
1692 new CustomEvent
<WorkerNodeEventDetail
>('idleWorkerNode', {
1693 detail
: { workerId
: workerId
as number, workerNodeKey
}
1701 private checkAndEmitTaskExecutionEvents (): void {
1703 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1707 private checkAndEmitTaskQueuingEvents (): void {
1708 if (this.hasBackPressure()) {
1709 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1713 private checkAndEmitDynamicWorkerCreationEvents (): void {
1714 if (this.type === PoolTypes
.dynamic
) {
1716 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1722 * Gets the worker information given its worker node key.
1724 * @param workerNodeKey - The worker node key.
1725 * @returns The worker information.
1727 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1728 return this.workerNodes
[workerNodeKey
]?.info
1732 * Adds the given worker in the pool worker nodes.
1734 * @param worker - The worker.
1735 * @returns The added worker node key.
1736 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1738 private addWorkerNode (worker
: Worker
): number {
1739 const workerNode
= new WorkerNode
<Worker
, Data
>(
1741 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1743 // Flag the worker node as ready at pool startup.
1744 if (this.starting
) {
1745 workerNode
.info
.ready
= true
1747 this.workerNodes
.push(workerNode
)
1748 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1749 if (workerNodeKey
=== -1) {
1750 throw new Error('Worker added not found in worker nodes')
1752 return workerNodeKey
1756 * Removes the given worker from the pool worker nodes.
1758 * @param worker - The worker.
1760 private removeWorkerNode (worker
: Worker
): void {
1761 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1762 if (workerNodeKey
!== -1) {
1763 this.workerNodes
.splice(workerNodeKey
, 1)
1764 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1768 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1769 this.getWorkerInfo(workerNodeKey
).ready
= false
1773 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1775 this.opts
.enableTasksQueue
=== true &&
1776 this.workerNodes
[workerNodeKey
].hasBackPressure()
1780 private hasBackPressure (): boolean {
1782 this.opts
.enableTasksQueue
=== true &&
1783 this.workerNodes
.findIndex(
1784 workerNode
=> !workerNode
.hasBackPressure()
1790 * Executes the given task on the worker given its worker node key.
1792 * @param workerNodeKey - The worker node key.
1793 * @param task - The task to execute.
1795 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1796 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1797 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1798 this.checkAndEmitTaskExecutionEvents()
1801 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1802 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1803 this.checkAndEmitTaskQueuingEvents()
1804 return tasksQueueSize
1807 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1808 return this.workerNodes
[workerNodeKey
].dequeueTask()
1811 private tasksQueueSize (workerNodeKey
: number): number {
1812 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1815 protected flushTasksQueue (workerNodeKey
: number): void {
1816 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1819 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1822 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1825 private flushTasksQueues (): void {
1826 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1827 this.flushTasksQueue(workerNodeKey
)