1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { TransferListItem
} from
'node:worker_threads'
4 import { EventEmitterAsyncResource
} from
'node:events'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
22 import { KillBehaviors
} from
'../worker/worker-options'
23 import type { TaskFunction
} from
'../worker/task-functions'
31 type TasksQueueOptions
37 WorkerNodeEventDetail
,
42 type MeasurementStatisticsRequirements
,
44 WorkerChoiceStrategies
,
45 type WorkerChoiceStrategy
,
46 type WorkerChoiceStrategyOptions
47 } from
'./selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
49 import { version
} from
'./version'
50 import { WorkerNode
} from
'./worker-node'
53 checkValidTasksQueueOptions
,
54 checkValidWorkerChoiceStrategy
,
55 updateMeasurementStatistics
59 * Base class that implements some shared logic for all poolifier pools.
61 * @typeParam Worker - Type of worker which manages this pool.
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
65 export abstract class AbstractPool
<
66 Worker
extends IWorker
,
69 > implements IPool
<Worker
, Data
, Response
> {
71 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
74 public emitter
?: EventEmitterAsyncResource
77 * Dynamic pool maximum size property placeholder.
79 protected readonly max
?: number
82 * The task execution response promise map:
83 * - `key`: The message id of each submitted task.
84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
88 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
89 new Map
<string, PromiseResponseWrapper
<Response
>>()
92 * Worker choice strategy context referencing a worker choice algorithm implementation.
94 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
105 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
108 * Whether the pool is started or not.
110 private started
: boolean
112 * Whether the pool is starting or not.
114 private starting
: boolean
116 * Whether the pool is destroying or not.
118 private destroying
: boolean
120 * Whether the pool ready event has been emitted or not.
122 private readyEventEmitted
: boolean
124 * The start timestamp of the pool.
126 private readonly startTimestamp
129 * Constructs a new poolifier pool.
131 * @param numberOfWorkers - Number of workers that this pool should manage.
132 * @param filePath - Path to the worker file.
133 * @param opts - Options for the pool.
136 protected readonly numberOfWorkers
: number,
137 protected readonly filePath
: string,
138 protected readonly opts
: PoolOptions
<Worker
>
140 if (!this.isMain()) {
142 'Cannot start a pool from a worker with the same type as the pool'
145 checkFilePath(this.filePath
)
146 this.checkNumberOfWorkers(this.numberOfWorkers
)
147 this.checkPoolOptions(this.opts
)
149 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
150 this.executeTask
= this.executeTask
.bind(this)
151 this.enqueueTask
= this.enqueueTask
.bind(this)
153 if (this.opts
.enableEvents
=== true) {
154 this.initializeEventEmitter()
156 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
162 this.opts
.workerChoiceStrategy
,
163 this.opts
.workerChoiceStrategyOptions
168 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
171 this.starting
= false
172 this.destroying
= false
173 this.readyEventEmitted
= false
174 if (this.opts
.startWorkers
=== true) {
178 this.startTimestamp
= performance
.now()
181 private checkNumberOfWorkers (numberOfWorkers
: number): void {
182 if (numberOfWorkers
== null) {
184 'Cannot instantiate a pool without specifying the number of workers'
186 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
188 'Cannot instantiate a pool with a non safe integer number of workers'
190 } else if (numberOfWorkers
< 0) {
191 throw new RangeError(
192 'Cannot instantiate a pool with a negative number of workers'
194 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
195 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
200 if (isPlainObject(opts
)) {
201 this.opts
.startWorkers
= opts
.startWorkers
?? true
202 checkValidWorkerChoiceStrategy(
203 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
205 this.opts
.workerChoiceStrategy
=
206 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
207 this.checkValidWorkerChoiceStrategyOptions(
208 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
210 this.opts
.workerChoiceStrategyOptions
= {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
212 ...opts
.workerChoiceStrategyOptions
214 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
215 this.opts
.enableEvents
= opts
.enableEvents
?? true
216 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
217 if (this.opts
.enableTasksQueue
) {
218 checkValidTasksQueueOptions(opts
.tasksQueueOptions
as TasksQueueOptions
)
219 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
220 opts
.tasksQueueOptions
as TasksQueueOptions
224 throw new TypeError('Invalid pool options: must be a plain object')
228 private checkValidWorkerChoiceStrategyOptions (
229 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
232 workerChoiceStrategyOptions
!= null &&
233 !isPlainObject(workerChoiceStrategyOptions
)
236 'Invalid worker choice strategy options: must be a plain object'
240 workerChoiceStrategyOptions
?.retries
!= null &&
241 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
244 'Invalid worker choice strategy options: retries must be an integer'
248 workerChoiceStrategyOptions
?.retries
!= null &&
249 workerChoiceStrategyOptions
.retries
< 0
251 throw new RangeError(
252 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
256 workerChoiceStrategyOptions
?.weights
!= null &&
257 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
260 'Invalid worker choice strategy options: must have a weight for each worker node'
264 workerChoiceStrategyOptions
?.measurement
!= null &&
265 !Object.values(Measurements
).includes(
266 workerChoiceStrategyOptions
.measurement
270 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
275 private initializeEventEmitter (): void {
276 this.emitter
= new EventEmitterAsyncResource({
277 name
: `poolifier:${this.type}-${this.worker}-pool`
282 public get
info (): PoolInfo
{
287 started
: this.started
,
289 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
290 minSize
: this.minSize
,
291 maxSize
: this.maxSize
,
292 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
293 .runTime
.aggregate
&&
294 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
295 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
296 workerNodes
: this.workerNodes
.length
,
297 idleWorkerNodes
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 workerNode
.usage
.tasks
.executing
=== 0
304 busyWorkerNodes
: this.workerNodes
.reduce(
305 (accumulator
, workerNode
) =>
306 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
309 executedTasks
: this.workerNodes
.reduce(
310 (accumulator
, workerNode
) =>
311 accumulator
+ workerNode
.usage
.tasks
.executed
,
314 executingTasks
: this.workerNodes
.reduce(
315 (accumulator
, workerNode
) =>
316 accumulator
+ workerNode
.usage
.tasks
.executing
,
319 ...(this.opts
.enableTasksQueue
=== true && {
320 queuedTasks
: this.workerNodes
.reduce(
321 (accumulator
, workerNode
) =>
322 accumulator
+ workerNode
.usage
.tasks
.queued
,
326 ...(this.opts
.enableTasksQueue
=== true && {
327 maxQueuedTasks
: this.workerNodes
.reduce(
328 (accumulator
, workerNode
) =>
329 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
333 ...(this.opts
.enableTasksQueue
=== true && {
334 backPressure
: this.hasBackPressure()
336 ...(this.opts
.enableTasksQueue
=== true && {
337 stolenTasks
: this.workerNodes
.reduce(
338 (accumulator
, workerNode
) =>
339 accumulator
+ workerNode
.usage
.tasks
.stolen
,
343 failedTasks
: this.workerNodes
.reduce(
344 (accumulator
, workerNode
) =>
345 accumulator
+ workerNode
.usage
.tasks
.failed
,
348 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
349 .runTime
.aggregate
&& {
353 ...this.workerNodes
.map(
354 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
360 ...this.workerNodes
.map(
361 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
365 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
366 .runTime
.average
&& {
369 this.workerNodes
.reduce
<number[]>(
370 (accumulator
, workerNode
) =>
371 accumulator
.concat(workerNode
.usage
.runTime
.history
),
377 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
381 this.workerNodes
.reduce
<number[]>(
382 (accumulator
, workerNode
) =>
383 accumulator
.concat(workerNode
.usage
.runTime
.history
),
391 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
392 .waitTime
.aggregate
&& {
396 ...this.workerNodes
.map(
397 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
403 ...this.workerNodes
.map(
404 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
408 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
409 .waitTime
.average
&& {
412 this.workerNodes
.reduce
<number[]>(
413 (accumulator
, workerNode
) =>
414 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
420 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
421 .waitTime
.median
&& {
424 this.workerNodes
.reduce
<number[]>(
425 (accumulator
, workerNode
) =>
426 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
438 * The pool readiness boolean status.
440 private get
ready (): boolean {
442 this.workerNodes
.reduce(
443 (accumulator
, workerNode
) =>
444 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
453 * The approximate pool utilization.
455 * @returns The pool utilization.
457 private get
utilization (): number {
458 const poolTimeCapacity
=
459 (performance
.now() - this.startTimestamp
) * this.maxSize
460 const totalTasksRunTime
= this.workerNodes
.reduce(
461 (accumulator
, workerNode
) =>
462 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
465 const totalTasksWaitTime
= this.workerNodes
.reduce(
466 (accumulator
, workerNode
) =>
467 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
470 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
476 * If it is `'dynamic'`, it provides the `max` property.
478 protected abstract get
type (): PoolType
483 protected abstract get
worker (): WorkerType
486 * The pool minimum size.
488 protected get
minSize (): number {
489 return this.numberOfWorkers
493 * The pool maximum size.
495 protected get
maxSize (): number {
496 return this.max
?? this.numberOfWorkers
500 * Checks if the worker id sent in the received message from a worker is valid.
502 * @param message - The received message.
503 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
505 private checkMessageWorkerId (message
: MessageValue
<Data
| Response
>): void {
506 if (message
.workerId
== null) {
507 throw new Error('Worker message received without worker id')
508 } else if (this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1) {
510 `Worker message received from unknown worker '${message.workerId}'`
516 * Gets the given worker its worker node key.
518 * @param worker - The worker.
519 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
521 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
522 return this.workerNodes
.findIndex(
523 workerNode
=> workerNode
.worker
=== worker
528 * Gets the worker node key given its worker id.
530 * @param workerId - The worker id.
531 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
533 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
534 return this.workerNodes
.findIndex(
535 workerNode
=> workerNode
.info
.id
=== workerId
540 public setWorkerChoiceStrategy (
541 workerChoiceStrategy
: WorkerChoiceStrategy
,
542 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
544 checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
545 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
546 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
547 this.opts
.workerChoiceStrategy
549 if (workerChoiceStrategyOptions
!= null) {
550 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
552 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
553 workerNode
.resetUsage()
554 this.sendStatisticsMessageToWorker(workerNodeKey
)
559 public setWorkerChoiceStrategyOptions (
560 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
562 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
563 this.opts
.workerChoiceStrategyOptions
= {
564 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
565 ...workerChoiceStrategyOptions
567 this.workerChoiceStrategyContext
.setOptions(
568 this.opts
.workerChoiceStrategyOptions
573 public enableTasksQueue (
575 tasksQueueOptions
?: TasksQueueOptions
577 if (this.opts
.enableTasksQueue
=== true && !enable
) {
578 this.unsetTaskStealing()
579 this.unsetTasksStealingOnBackPressure()
580 this.flushTasksQueues()
582 this.opts
.enableTasksQueue
= enable
583 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
587 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
588 if (this.opts
.enableTasksQueue
=== true) {
589 checkValidTasksQueueOptions(tasksQueueOptions
)
590 this.opts
.tasksQueueOptions
=
591 this.buildTasksQueueOptions(tasksQueueOptions
)
592 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
593 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
594 this.setTaskStealing()
596 this.unsetTaskStealing()
598 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
599 this.setTasksStealingOnBackPressure()
601 this.unsetTasksStealingOnBackPressure()
603 } else if (this.opts
.tasksQueueOptions
!= null) {
604 delete this.opts
.tasksQueueOptions
608 private buildTasksQueueOptions (
609 tasksQueueOptions
: TasksQueueOptions
610 ): TasksQueueOptions
{
613 size
: Math.pow(this.maxSize
, 2),
616 tasksStealingOnBackPressure
: true
622 private setTasksQueueSize (size
: number): void {
623 for (const workerNode
of this.workerNodes
) {
624 workerNode
.tasksQueueBackPressureSize
= size
628 private setTaskStealing (): void {
629 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
630 this.workerNodes
[workerNodeKey
].addEventListener(
632 this.handleIdleWorkerNodeEvent
as EventListener
637 private unsetTaskStealing (): void {
638 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
639 this.workerNodes
[workerNodeKey
].removeEventListener(
641 this.handleIdleWorkerNodeEvent
as EventListener
646 private setTasksStealingOnBackPressure (): void {
647 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
648 this.workerNodes
[workerNodeKey
].addEventListener(
650 this.handleBackPressureEvent
as EventListener
655 private unsetTasksStealingOnBackPressure (): void {
656 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
657 this.workerNodes
[workerNodeKey
].removeEventListener(
659 this.handleBackPressureEvent
as EventListener
665 * Whether the pool is full or not.
667 * The pool filling boolean status.
669 protected get
full (): boolean {
670 return this.workerNodes
.length
>= this.maxSize
674 * Whether the pool is busy or not.
676 * The pool busyness boolean status.
678 protected abstract get
busy (): boolean
681 * Whether worker nodes are executing concurrently their tasks quota or not.
683 * @returns Worker nodes busyness boolean status.
685 protected internalBusy (): boolean {
686 if (this.opts
.enableTasksQueue
=== true) {
688 this.workerNodes
.findIndex(
690 workerNode
.info
.ready
&&
691 workerNode
.usage
.tasks
.executing
<
692 (this.opts
.tasksQueueOptions
?.concurrency
as number)
697 this.workerNodes
.findIndex(
699 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
704 private async sendTaskFunctionOperationToWorker (
705 workerNodeKey
: number,
706 message
: MessageValue
<Data
>
707 ): Promise
<boolean> {
708 return await new Promise
<boolean>((resolve
, reject
) => {
709 const taskFunctionOperationListener
= (
710 message
: MessageValue
<Response
>
712 this.checkMessageWorkerId(message
)
713 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
715 message
.taskFunctionOperationStatus
!= null &&
716 message
.workerId
=== workerId
718 if (message
.taskFunctionOperationStatus
) {
720 } else if (!message
.taskFunctionOperationStatus
) {
723 `Task function operation '${
724 message.taskFunctionOperation as string
725 }' failed on worker ${message.workerId} with error: '${
726 message.workerError?.message as string
731 this.deregisterWorkerMessageListener(
732 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
733 taskFunctionOperationListener
737 this.registerWorkerMessageListener(
739 taskFunctionOperationListener
741 this.sendToWorker(workerNodeKey
, message
)
745 private async sendTaskFunctionOperationToWorkers (
746 message
: MessageValue
<Data
>
747 ): Promise
<boolean> {
748 return await new Promise
<boolean>((resolve
, reject
) => {
749 const responsesReceived
= new Array<MessageValue
<Response
>>()
750 const taskFunctionOperationsListener
= (
751 message
: MessageValue
<Response
>
753 this.checkMessageWorkerId(message
)
754 if (message
.taskFunctionOperationStatus
!= null) {
755 responsesReceived
.push(message
)
756 if (responsesReceived
.length
=== this.workerNodes
.length
) {
758 responsesReceived
.every(
759 message
=> message
.taskFunctionOperationStatus
=== true
764 responsesReceived
.some(
765 message
=> message
.taskFunctionOperationStatus
=== false
768 const errorResponse
= responsesReceived
.find(
769 response
=> response
.taskFunctionOperationStatus
=== false
773 `Task function operation '${
774 message.taskFunctionOperation as string
775 }' failed on worker ${
776 errorResponse?.workerId as number
778 errorResponse?.workerError?.message as string
783 this.deregisterWorkerMessageListener(
784 this.getWorkerNodeKeyByWorkerId(message
.workerId
),
785 taskFunctionOperationsListener
790 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
791 this.registerWorkerMessageListener(
793 taskFunctionOperationsListener
795 this.sendToWorker(workerNodeKey
, message
)
801 public hasTaskFunction (name
: string): boolean {
802 for (const workerNode
of this.workerNodes
) {
804 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
805 workerNode
.info
.taskFunctionNames
.includes(name
)
814 public async addTaskFunction (
816 fn
: TaskFunction
<Data
, Response
>
817 ): Promise
<boolean> {
818 if (typeof name
!== 'string') {
819 throw new TypeError('name argument must be a string')
821 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
822 throw new TypeError('name argument must not be an empty string')
824 if (typeof fn
!== 'function') {
825 throw new TypeError('fn argument must be a function')
827 const opResult
= await this.sendTaskFunctionOperationToWorkers({
828 taskFunctionOperation
: 'add',
829 taskFunctionName
: name
,
830 taskFunction
: fn
.toString()
832 this.taskFunctions
.set(name
, fn
)
837 public async removeTaskFunction (name
: string): Promise
<boolean> {
838 if (!this.taskFunctions
.has(name
)) {
840 'Cannot remove a task function not handled on the pool side'
843 const opResult
= await this.sendTaskFunctionOperationToWorkers({
844 taskFunctionOperation
: 'remove',
845 taskFunctionName
: name
847 this.deleteTaskFunctionWorkerUsages(name
)
848 this.taskFunctions
.delete(name
)
853 public listTaskFunctionNames (): string[] {
854 for (const workerNode
of this.workerNodes
) {
856 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
857 workerNode
.info
.taskFunctionNames
.length
> 0
859 return workerNode
.info
.taskFunctionNames
866 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
867 return await this.sendTaskFunctionOperationToWorkers({
868 taskFunctionOperation
: 'default',
869 taskFunctionName
: name
873 private deleteTaskFunctionWorkerUsages (name
: string): void {
874 for (const workerNode
of this.workerNodes
) {
875 workerNode
.deleteTaskFunctionWorkerUsage(name
)
879 private shallExecuteTask (workerNodeKey
: number): boolean {
881 this.tasksQueueSize(workerNodeKey
) === 0 &&
882 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
883 (this.opts
.tasksQueueOptions
?.concurrency
as number)
888 public async execute (
891 transferList
?: TransferListItem
[]
892 ): Promise
<Response
> {
893 return await new Promise
<Response
>((resolve
, reject
) => {
895 reject(new Error('Cannot execute a task on not started pool'))
898 if (this.destroying
) {
899 reject(new Error('Cannot execute a task on destroying pool'))
902 if (name
!= null && typeof name
!== 'string') {
903 reject(new TypeError('name argument must be a string'))
908 typeof name
=== 'string' &&
909 name
.trim().length
=== 0
911 reject(new TypeError('name argument must not be an empty string'))
914 if (transferList
!= null && !Array.isArray(transferList
)) {
915 reject(new TypeError('transferList argument must be an array'))
918 const timestamp
= performance
.now()
919 const workerNodeKey
= this.chooseWorkerNode()
920 const task
: Task
<Data
> = {
921 name
: name
?? DEFAULT_TASK_NAME
,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data
: data
?? ({} as Data
),
928 this.promiseResponseMap
.set(task
.taskId
as string, {
934 this.opts
.enableTasksQueue
=== false ||
935 (this.opts
.enableTasksQueue
=== true &&
936 this.shallExecuteTask(workerNodeKey
))
938 this.executeTask(workerNodeKey
, task
)
940 this.enqueueTask(workerNodeKey
, task
)
946 public start (): void {
948 throw new Error('Cannot start an already started pool')
951 throw new Error('Cannot start an already starting pool')
953 if (this.destroying
) {
954 throw new Error('Cannot start a destroying pool')
958 this.workerNodes
.reduce(
959 (accumulator
, workerNode
) =>
960 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
962 ) < this.numberOfWorkers
964 this.createAndSetupWorkerNode()
966 this.starting
= false
971 public async destroy (): Promise
<void> {
973 throw new Error('Cannot destroy an already destroyed pool')
976 throw new Error('Cannot destroy an starting pool')
978 if (this.destroying
) {
979 throw new Error('Cannot destroy an already destroying pool')
981 this.destroying
= true
983 this.workerNodes
.map(async (_
, workerNodeKey
) => {
984 await this.destroyWorkerNode(workerNodeKey
)
987 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
988 this.emitter
?.emitDestroy()
989 this.readyEventEmitted
= false
990 this.destroying
= false
994 protected async sendKillMessageToWorker (
995 workerNodeKey
: number
997 await new Promise
<void>((resolve
, reject
) => {
998 const killMessageListener
= (message
: MessageValue
<Response
>): void => {
999 this.checkMessageWorkerId(message
)
1000 if (message
.kill
=== 'success') {
1002 } else if (message
.kill
=== 'failure') {
1005 `Kill message handling failed on worker ${
1006 message.workerId as number
1012 // FIXME: should be registered only once
1013 this.registerWorkerMessageListener(workerNodeKey
, killMessageListener
)
1014 this.sendToWorker(workerNodeKey
, { kill
: true })
1019 * Terminates the worker node given its worker node key.
1021 * @param workerNodeKey - The worker node key.
1023 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1026 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1027 * Can be overridden.
1031 protected setupHook (): void {
1032 /* Intentionally empty */
1036 * Should return whether the worker is the main worker or not.
1038 protected abstract isMain (): boolean
1041 * Hook executed before the worker task execution.
1042 * Can be overridden.
1044 * @param workerNodeKey - The worker node key.
1045 * @param task - The task to execute.
1047 protected beforeTaskExecutionHook (
1048 workerNodeKey
: number,
1051 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1052 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1053 ++workerUsage
.tasks
.executing
1054 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1057 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1058 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1062 const taskFunctionWorkerUsage
= this.workerNodes
[
1064 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1065 ++taskFunctionWorkerUsage
.tasks
.executing
1066 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1071 * Hook executed after the worker task execution.
1072 * Can be overridden.
1074 * @param workerNodeKey - The worker node key.
1075 * @param message - The received message.
1077 protected afterTaskExecutionHook (
1078 workerNodeKey
: number,
1079 message
: MessageValue
<Response
>
1081 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1082 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1083 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1084 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1085 this.updateEluWorkerUsage(workerUsage
, message
)
1088 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1089 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1090 message
.taskPerformance
?.name
as string
1093 const taskFunctionWorkerUsage
= this.workerNodes
[
1095 ].getTaskFunctionWorkerUsage(
1096 message
.taskPerformance
?.name
as string
1098 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1099 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1100 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1105 * Whether the worker node shall update its task function worker usage or not.
1107 * @param workerNodeKey - The worker node key.
1108 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1110 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1111 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1113 workerInfo
!= null &&
1114 Array.isArray(workerInfo
.taskFunctionNames
) &&
1115 workerInfo
.taskFunctionNames
.length
> 2
1119 private updateTaskStatisticsWorkerUsage (
1120 workerUsage
: WorkerUsage
,
1121 message
: MessageValue
<Response
>
1123 const workerTaskStatistics
= workerUsage
.tasks
1125 workerTaskStatistics
.executing
!= null &&
1126 workerTaskStatistics
.executing
> 0
1128 --workerTaskStatistics
.executing
1130 if (message
.workerError
== null) {
1131 ++workerTaskStatistics
.executed
1133 ++workerTaskStatistics
.failed
1137 private updateRunTimeWorkerUsage (
1138 workerUsage
: WorkerUsage
,
1139 message
: MessageValue
<Response
>
1141 if (message
.workerError
!= null) {
1144 updateMeasurementStatistics(
1145 workerUsage
.runTime
,
1146 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1147 message
.taskPerformance
?.runTime
?? 0
1151 private updateWaitTimeWorkerUsage (
1152 workerUsage
: WorkerUsage
,
1155 const timestamp
= performance
.now()
1156 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1157 updateMeasurementStatistics(
1158 workerUsage
.waitTime
,
1159 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1164 private updateEluWorkerUsage (
1165 workerUsage
: WorkerUsage
,
1166 message
: MessageValue
<Response
>
1168 if (message
.workerError
!= null) {
1171 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1172 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1173 updateMeasurementStatistics(
1174 workerUsage
.elu
.active
,
1175 eluTaskStatisticsRequirements
,
1176 message
.taskPerformance
?.elu
?.active
?? 0
1178 updateMeasurementStatistics(
1179 workerUsage
.elu
.idle
,
1180 eluTaskStatisticsRequirements
,
1181 message
.taskPerformance
?.elu
?.idle
?? 0
1183 if (eluTaskStatisticsRequirements
.aggregate
) {
1184 if (message
.taskPerformance
?.elu
!= null) {
1185 if (workerUsage
.elu
.utilization
!= null) {
1186 workerUsage
.elu
.utilization
=
1187 (workerUsage
.elu
.utilization
+
1188 message
.taskPerformance
.elu
.utilization
) /
1191 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1198 * Chooses a worker node for the next task.
1200 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1202 * @returns The chosen worker node key
1204 private chooseWorkerNode (): number {
1205 if (this.shallCreateDynamicWorker()) {
1206 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1208 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1210 return workerNodeKey
1213 return this.workerChoiceStrategyContext
.execute()
1217 * Conditions for dynamic worker creation.
1219 * @returns Whether to create a dynamic worker or not.
1221 private shallCreateDynamicWorker (): boolean {
1222 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1226 * Sends a message to worker given its worker node key.
1228 * @param workerNodeKey - The worker node key.
1229 * @param message - The message.
1230 * @param transferList - The optional array of transferable objects.
1232 protected abstract sendToWorker (
1233 workerNodeKey
: number,
1234 message
: MessageValue
<Data
>,
1235 transferList
?: TransferListItem
[]
1239 * Creates a new worker.
1241 * @returns Newly created worker.
1243 protected abstract createWorker (): Worker
1246 * Creates a new, completely set up worker node.
1248 * @returns New, completely set up worker node key.
1250 protected createAndSetupWorkerNode (): number {
1251 const worker
= this.createWorker()
1253 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1254 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1255 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1256 worker
.on('error', error
=> {
1257 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1258 this.flagWorkerNodeAsNotReady(workerNodeKey
)
1259 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1260 this.emitter
?.emit(PoolEvents
.error
, error
)
1261 this.workerNodes
[workerNodeKey
].closeChannel()
1266 this.opts
.restartWorkerOnError
=== true
1268 if (workerInfo
.dynamic
) {
1269 this.createAndSetupDynamicWorkerNode()
1271 this.createAndSetupWorkerNode()
1274 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1275 this.redistributeQueuedTasks(workerNodeKey
)
1278 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1279 worker
.once('exit', () => {
1280 this.removeWorkerNode(worker
)
1283 const workerNodeKey
= this.addWorkerNode(worker
)
1285 this.afterWorkerNodeSetup(workerNodeKey
)
1287 return workerNodeKey
1291 * Creates a new, completely set up dynamic worker node.
1293 * @returns New, completely set up dynamic worker node key.
1295 protected createAndSetupDynamicWorkerNode (): number {
1296 const workerNodeKey
= this.createAndSetupWorkerNode()
1297 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1298 this.checkMessageWorkerId(message
)
1299 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1302 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1303 // Kill message received from worker
1305 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1306 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1307 ((this.opts
.enableTasksQueue
=== false &&
1308 workerUsage
.tasks
.executing
=== 0) ||
1309 (this.opts
.enableTasksQueue
=== true &&
1310 workerUsage
.tasks
.executing
=== 0 &&
1311 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1313 // Flag the worker node as not ready immediately
1314 this.flagWorkerNodeAsNotReady(localWorkerNodeKey
)
1315 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1316 this.emitter
?.emit(PoolEvents
.error
, error
)
1320 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1321 this.sendToWorker(workerNodeKey
, {
1324 if (this.taskFunctions
.size
> 0) {
1325 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1326 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1327 taskFunctionOperation
: 'add',
1329 taskFunction
: taskFunction
.toString()
1331 this.emitter
?.emit(PoolEvents
.error
, error
)
1335 workerInfo
.dynamic
= true
1337 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1338 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1340 workerInfo
.ready
= true
1342 this.checkAndEmitDynamicWorkerCreationEvents()
1343 return workerNodeKey
1347 * Registers a listener callback on the worker given its worker node key.
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1352 protected abstract registerWorkerMessageListener
<
1353 Message
extends Data
| Response
1355 workerNodeKey
: number,
1356 listener
: (message
: MessageValue
<Message
>) => void
1360 * Registers once a listener callback on the worker given its worker node key.
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1365 protected abstract registerOnceWorkerMessageListener
<
1366 Message
extends Data
| Response
1368 workerNodeKey
: number,
1369 listener
: (message
: MessageValue
<Message
>) => void
1373 * Deregisters a listener callback on the worker given its worker node key.
1375 * @param workerNodeKey - The worker node key.
1376 * @param listener - The message listener callback.
1378 protected abstract deregisterWorkerMessageListener
<
1379 Message
extends Data
| Response
1381 workerNodeKey
: number,
1382 listener
: (message
: MessageValue
<Message
>) => void
1386 * Method hooked up after a worker node has been newly created.
1387 * Can be overridden.
1389 * @param workerNodeKey - The newly created worker node key.
1391 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1392 // Listen to worker messages.
1393 this.registerWorkerMessageListener(
1395 this.workerMessageListener
.bind(this)
1397 // Send the startup message to worker.
1398 this.sendStartupMessageToWorker(workerNodeKey
)
1399 // Send the statistics message to worker.
1400 this.sendStatisticsMessageToWorker(workerNodeKey
)
1401 if (this.opts
.enableTasksQueue
=== true) {
1402 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1403 this.workerNodes
[workerNodeKey
].addEventListener(
1405 this.handleIdleWorkerNodeEvent
as EventListener
1408 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1409 this.workerNodes
[workerNodeKey
].addEventListener(
1411 this.handleBackPressureEvent
as EventListener
1418 * Sends the startup message to worker given its worker node key.
1420 * @param workerNodeKey - The worker node key.
1422 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1425 * Sends the statistics message to worker given its worker node key.
1427 * @param workerNodeKey - The worker node key.
1429 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1430 this.sendToWorker(workerNodeKey
, {
1433 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1435 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1441 private redistributeQueuedTasks (workerNodeKey
: number): void {
1442 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1443 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1444 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1445 return workerNode
.info
.ready
&&
1446 workerNode
.usage
.tasks
.queued
<
1447 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1453 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1454 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1455 this.executeTask(destinationWorkerNodeKey
, task
)
1457 this.enqueueTask(destinationWorkerNodeKey
, task
)
1462 private updateTaskStolenStatisticsWorkerUsage (
1463 workerNodeKey
: number,
1466 const workerNode
= this.workerNodes
[workerNodeKey
]
1467 if (workerNode
?.usage
!= null) {
1468 ++workerNode
.usage
.tasks
.stolen
1471 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1472 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1474 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1477 ++taskFunctionWorkerUsage
.tasks
.stolen
1481 private readonly handleIdleWorkerNodeEvent
= (
1482 event
: CustomEvent
<WorkerNodeEventDetail
>
1484 const { workerId
} = event
.detail
1485 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1486 const workerNodes
= this.workerNodes
1489 (workerNodeA
, workerNodeB
) =>
1490 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1492 const sourceWorkerNode
= workerNodes
.find(
1494 workerNode
.info
.ready
&&
1495 workerNode
.info
.id
!== workerId
&&
1496 workerNode
.usage
.tasks
.queued
> 0
1498 if (sourceWorkerNode
!= null) {
1499 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1500 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1501 this.executeTask(destinationWorkerNodeKey
, task
)
1503 this.enqueueTask(destinationWorkerNodeKey
, task
)
1505 this.updateTaskStolenStatisticsWorkerUsage(
1506 destinationWorkerNodeKey
,
1512 private readonly handleBackPressureEvent
= (
1513 event
: CustomEvent
<WorkerNodeEventDetail
>
1515 const { workerId
} = event
.detail
1516 const sizeOffset
= 1
1517 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1520 const sourceWorkerNode
=
1521 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1522 const workerNodes
= this.workerNodes
1525 (workerNodeA
, workerNodeB
) =>
1526 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1528 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1530 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1531 workerNode
.info
.ready
&&
1532 workerNode
.info
.id
!== workerId
&&
1533 workerNode
.usage
.tasks
.queued
<
1534 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1536 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1537 if (this.shallExecuteTask(workerNodeKey
)) {
1538 this.executeTask(workerNodeKey
, task
)
1540 this.enqueueTask(workerNodeKey
, task
)
1542 this.updateTaskStolenStatisticsWorkerUsage(
1551 * This method is the message listener registered on each worker.
1553 protected workerMessageListener (message
: MessageValue
<Response
>): void {
1554 this.checkMessageWorkerId(message
)
1555 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1556 // Worker ready response received from worker
1557 this.handleWorkerReadyResponse(message
)
1558 } else if (message
.taskId
!= null) {
1559 // Task execution response received from worker
1560 this.handleTaskExecutionResponse(message
)
1561 } else if (message
.taskFunctionNames
!= null) {
1562 // Task function names message received from worker
1564 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1565 ).taskFunctionNames
= message
.taskFunctionNames
1569 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1570 if (message
.ready
=== false) {
1572 `Worker ${message.workerId as number} failed to initialize`
1575 const workerInfo
= this.getWorkerInfo(
1576 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1578 workerInfo
.ready
= message
.ready
as boolean
1579 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1580 if (!this.readyEventEmitted
&& this.ready
) {
1581 this.readyEventEmitted
= true
1582 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1586 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1587 const { taskId
, workerError
, data
} = message
1588 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1589 if (promiseResponse
!= null) {
1590 const { resolve
, reject
, workerNodeKey
} = promiseResponse
1591 if (workerError
!= null) {
1592 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1593 reject(workerError
.message
)
1595 resolve(data
as Response
)
1597 this.afterTaskExecutionHook(workerNodeKey
, message
)
1598 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1599 this.promiseResponseMap
.delete(taskId
as string)
1601 this.opts
.enableTasksQueue
=== true &&
1602 this.tasksQueueSize(workerNodeKey
) > 0 &&
1603 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1604 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1608 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1614 private checkAndEmitTaskExecutionEvents (): void {
1616 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1620 private checkAndEmitTaskQueuingEvents (): void {
1621 if (this.hasBackPressure()) {
1622 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1626 private checkAndEmitDynamicWorkerCreationEvents (): void {
1627 if (this.type === PoolTypes
.dynamic
) {
1629 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1635 * Gets the worker information given its worker node key.
1637 * @param workerNodeKey - The worker node key.
1638 * @returns The worker information.
1640 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1641 return this.workerNodes
[workerNodeKey
]?.info
1645 * Adds the given worker in the pool worker nodes.
1647 * @param worker - The worker.
1648 * @returns The added worker node key.
1649 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1651 private addWorkerNode (worker
: Worker
): number {
1652 const workerNode
= new WorkerNode
<Worker
, Data
>(
1654 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1656 // Flag the worker node as ready at pool startup.
1657 if (this.starting
) {
1658 workerNode
.info
.ready
= true
1660 this.workerNodes
.push(workerNode
)
1661 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1662 if (workerNodeKey
=== -1) {
1663 throw new Error('Worker added not found in worker nodes')
1665 return workerNodeKey
1669 * Removes the given worker from the pool worker nodes.
1671 * @param worker - The worker.
1673 private removeWorkerNode (worker
: Worker
): void {
1674 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1675 if (workerNodeKey
!== -1) {
1676 this.workerNodes
.splice(workerNodeKey
, 1)
1677 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1681 protected flagWorkerNodeAsNotReady (workerNodeKey
: number): void {
1682 this.getWorkerInfo(workerNodeKey
).ready
= false
1686 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1688 this.opts
.enableTasksQueue
=== true &&
1689 this.workerNodes
[workerNodeKey
].hasBackPressure()
1693 private hasBackPressure (): boolean {
1695 this.opts
.enableTasksQueue
=== true &&
1696 this.workerNodes
.findIndex(
1697 workerNode
=> !workerNode
.hasBackPressure()
1703 * Executes the given task on the worker given its worker node key.
1705 * @param workerNodeKey - The worker node key.
1706 * @param task - The task to execute.
1708 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1709 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1710 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1711 this.checkAndEmitTaskExecutionEvents()
1714 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1715 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1716 this.checkAndEmitTaskQueuingEvents()
1717 return tasksQueueSize
1720 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1721 return this.workerNodes
[workerNodeKey
].dequeueTask()
1724 private tasksQueueSize (workerNodeKey
: number): number {
1725 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1728 protected flushTasksQueue (workerNodeKey
: number): void {
1729 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1732 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1735 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1738 private flushTasksQueues (): void {
1739 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1740 this.flushTasksQueue(workerNodeKey
)