1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { existsSync
} from
'node:fs'
4 import { type TransferListItem
} from
'node:worker_threads'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
21 updateMeasurementStatistics
23 import { KillBehaviors
} from
'../worker/worker-options'
24 import type { TaskFunction
} from
'../worker/task-functions'
33 type TasksQueueOptions
43 type MeasurementStatisticsRequirements
,
45 WorkerChoiceStrategies
,
46 type WorkerChoiceStrategy
,
47 type WorkerChoiceStrategyOptions
48 } from
'./selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
50 import { version
} from
'./version'
51 import { WorkerNode
} from
'./worker-node'
54 * Base class that implements some shared logic for all poolifier pools.
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
60 export abstract class AbstractPool
<
61 Worker
extends IWorker
,
64 > implements IPool
<Worker
, Data
, Response
> {
66 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
69 public readonly emitter
?: PoolEmitter
72 * The task execution response promise map:
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
79 new Map
<string, PromiseResponseWrapper
<Response
>>()
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
91 * Dynamic pool maximum size property placeholder.
93 protected readonly max
?: number
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
100 private readonly taskFunctions
: Map
<string, TaskFunction
<Data
, Response
>>
103 * Whether the pool is started or not.
105 private started
: boolean
107 * Whether the pool is starting or not.
109 private starting
: boolean
111 * The start timestamp of the pool.
113 private readonly startTimestamp
116 * Constructs a new poolifier pool.
118 * @param numberOfWorkers - Number of workers that this pool should manage.
119 * @param filePath - Path to the worker file.
120 * @param opts - Options for the pool.
123 protected readonly numberOfWorkers
: number,
124 protected readonly filePath
: string,
125 protected readonly opts
: PoolOptions
<Worker
>
127 if (!this.isMain()) {
129 'Cannot start a pool from a worker with the same type as the pool'
132 this.checkNumberOfWorkers(this.numberOfWorkers
)
133 this.checkFilePath(this.filePath
)
134 this.checkPoolOptions(this.opts
)
136 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
137 this.executeTask
= this.executeTask
.bind(this)
138 this.enqueueTask
= this.enqueueTask
.bind(this)
140 if (this.opts
.enableEvents
=== true) {
141 this.emitter
= new PoolEmitter()
143 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
149 this.opts
.workerChoiceStrategy
,
150 this.opts
.workerChoiceStrategyOptions
155 this.taskFunctions
= new Map
<string, TaskFunction
<Data
, Response
>>()
158 this.starting
= false
159 if (this.opts
.startWorkers
=== true) {
163 this.startTimestamp
= performance
.now()
166 private checkFilePath (filePath
: string): void {
169 typeof filePath
!== 'string' ||
170 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
172 throw new Error('Please specify a file with a worker implementation')
174 if (!existsSync(filePath
)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
179 private checkNumberOfWorkers (numberOfWorkers
: number): void {
180 if (numberOfWorkers
== null) {
182 'Cannot instantiate a pool without specifying the number of workers'
184 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
186 'Cannot instantiate a pool with a non safe integer number of workers'
188 } else if (numberOfWorkers
< 0) {
189 throw new RangeError(
190 'Cannot instantiate a pool with a negative number of workers'
192 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
197 protected checkDynamicPoolSize (min
: number, max
: number): void {
198 if (this.type === PoolTypes
.dynamic
) {
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
203 } else if (!Number.isSafeInteger(max
)) {
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
207 } else if (min
> max
) {
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
211 } else if (max
=== 0) {
212 throw new RangeError(
213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
215 } else if (min
=== max
) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
223 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
224 if (isPlainObject(opts
)) {
225 this.opts
.startWorkers
= opts
.startWorkers
?? true
226 this.checkValidWorkerChoiceStrategy(
227 opts
.workerChoiceStrategy
as WorkerChoiceStrategy
229 this.opts
.workerChoiceStrategy
=
230 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts
.workerChoiceStrategyOptions
as WorkerChoiceStrategyOptions
234 this.opts
.workerChoiceStrategyOptions
= {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
236 ...opts
.workerChoiceStrategyOptions
238 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
239 this.opts
.enableEvents
= opts
.enableEvents
?? true
240 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
241 if (this.opts
.enableTasksQueue
) {
242 this.checkValidTasksQueueOptions(
243 opts
.tasksQueueOptions
as TasksQueueOptions
245 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
246 opts
.tasksQueueOptions
as TasksQueueOptions
250 throw new TypeError('Invalid pool options: must be a plain object')
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy
: WorkerChoiceStrategy
258 workerChoiceStrategy
!= null &&
259 !Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)
262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
271 workerChoiceStrategyOptions
!= null &&
272 !isPlainObject(workerChoiceStrategyOptions
)
275 'Invalid worker choice strategy options: must be a plain object'
279 workerChoiceStrategyOptions
?.retries
!= null &&
280 !Number.isSafeInteger(workerChoiceStrategyOptions
.retries
)
283 'Invalid worker choice strategy options: retries must be an integer'
287 workerChoiceStrategyOptions
?.retries
!= null &&
288 workerChoiceStrategyOptions
.retries
< 0
290 throw new RangeError(
291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
295 workerChoiceStrategyOptions
?.weights
!= null &&
296 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
299 'Invalid worker choice strategy options: must have a weight for each worker node'
303 workerChoiceStrategyOptions
?.measurement
!= null &&
304 !Object.values(Measurements
).includes(
305 workerChoiceStrategyOptions
.measurement
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
314 private checkValidTasksQueueOptions (
315 tasksQueueOptions
: TasksQueueOptions
317 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
321 tasksQueueOptions
?.concurrency
!= null &&
322 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
325 'Invalid worker node tasks concurrency: must be an integer'
329 tasksQueueOptions
?.concurrency
!= null &&
330 tasksQueueOptions
.concurrency
<= 0
332 throw new RangeError(
333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
337 tasksQueueOptions
?.size
!= null &&
338 !Number.isSafeInteger(tasksQueueOptions
.size
)
341 'Invalid worker node tasks queue size: must be an integer'
344 if (tasksQueueOptions
?.size
!= null && tasksQueueOptions
.size
<= 0) {
345 throw new RangeError(
346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
352 public get
info (): PoolInfo
{
357 started
: this.started
,
359 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
360 minSize
: this.minSize
,
361 maxSize
: this.maxSize
,
362 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
363 .runTime
.aggregate
&&
364 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
365 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
366 workerNodes
: this.workerNodes
.length
,
367 idleWorkerNodes
: this.workerNodes
.reduce(
368 (accumulator
, workerNode
) =>
369 workerNode
.usage
.tasks
.executing
=== 0
374 busyWorkerNodes
: this.workerNodes
.reduce(
375 (accumulator
, workerNode
) =>
376 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
379 executedTasks
: this.workerNodes
.reduce(
380 (accumulator
, workerNode
) =>
381 accumulator
+ workerNode
.usage
.tasks
.executed
,
384 executingTasks
: this.workerNodes
.reduce(
385 (accumulator
, workerNode
) =>
386 accumulator
+ workerNode
.usage
.tasks
.executing
,
389 ...(this.opts
.enableTasksQueue
=== true && {
390 queuedTasks
: this.workerNodes
.reduce(
391 (accumulator
, workerNode
) =>
392 accumulator
+ workerNode
.usage
.tasks
.queued
,
396 ...(this.opts
.enableTasksQueue
=== true && {
397 maxQueuedTasks
: this.workerNodes
.reduce(
398 (accumulator
, workerNode
) =>
399 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
403 ...(this.opts
.enableTasksQueue
=== true && {
404 backPressure
: this.hasBackPressure()
406 ...(this.opts
.enableTasksQueue
=== true && {
407 stolenTasks
: this.workerNodes
.reduce(
408 (accumulator
, workerNode
) =>
409 accumulator
+ workerNode
.usage
.tasks
.stolen
,
413 failedTasks
: this.workerNodes
.reduce(
414 (accumulator
, workerNode
) =>
415 accumulator
+ workerNode
.usage
.tasks
.failed
,
418 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
419 .runTime
.aggregate
&& {
423 ...this.workerNodes
.map(
424 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
430 ...this.workerNodes
.map(
431 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
435 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
436 .runTime
.average
&& {
439 this.workerNodes
.reduce
<number[]>(
440 (accumulator
, workerNode
) =>
441 accumulator
.concat(workerNode
.usage
.runTime
.history
),
447 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
451 this.workerNodes
.reduce
<number[]>(
452 (accumulator
, workerNode
) =>
453 accumulator
.concat(workerNode
.usage
.runTime
.history
),
461 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
462 .waitTime
.aggregate
&& {
466 ...this.workerNodes
.map(
467 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
473 ...this.workerNodes
.map(
474 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
478 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
479 .waitTime
.average
&& {
482 this.workerNodes
.reduce
<number[]>(
483 (accumulator
, workerNode
) =>
484 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
490 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
491 .waitTime
.median
&& {
494 this.workerNodes
.reduce
<number[]>(
495 (accumulator
, workerNode
) =>
496 accumulator
.concat(workerNode
.usage
.waitTime
.history
),
508 * The pool readiness boolean status.
510 private get
ready (): boolean {
512 this.workerNodes
.reduce(
513 (accumulator
, workerNode
) =>
514 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
523 * The approximate pool utilization.
525 * @returns The pool utilization.
527 private get
utilization (): number {
528 const poolTimeCapacity
=
529 (performance
.now() - this.startTimestamp
) * this.maxSize
530 const totalTasksRunTime
= this.workerNodes
.reduce(
531 (accumulator
, workerNode
) =>
532 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
535 const totalTasksWaitTime
= this.workerNodes
.reduce(
536 (accumulator
, workerNode
) =>
537 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
540 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
546 * If it is `'dynamic'`, it provides the `max` property.
548 protected abstract get
type (): PoolType
553 protected abstract get
worker (): WorkerType
556 * The pool minimum size.
558 protected get
minSize (): number {
559 return this.numberOfWorkers
563 * The pool maximum size.
565 protected get
maxSize (): number {
566 return this.max
?? this.numberOfWorkers
570 * Checks if the worker id sent in the received message from a worker is valid.
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
575 private checkMessageWorkerId (message
: MessageValue
<Response
>): void {
576 if (message
.workerId
== null) {
577 throw new Error('Worker message received without worker id')
579 message
.workerId
!= null &&
580 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
583 `Worker message received from unknown worker '${message.workerId}'`
589 * Gets the given worker its worker node key.
591 * @param worker - The worker.
592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
594 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
595 return this.workerNodes
.findIndex(
596 workerNode
=> workerNode
.worker
=== worker
601 * Gets the worker node key given its worker id.
603 * @param workerId - The worker id.
604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
606 private getWorkerNodeKeyByWorkerId (workerId
: number | undefined): number {
607 return this.workerNodes
.findIndex(
608 workerNode
=> workerNode
.info
.id
=== workerId
613 public setWorkerChoiceStrategy (
614 workerChoiceStrategy
: WorkerChoiceStrategy
,
615 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
618 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
619 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
620 this.opts
.workerChoiceStrategy
622 if (workerChoiceStrategyOptions
!= null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
625 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
626 workerNode
.resetUsage()
627 this.sendStatisticsMessageToWorker(workerNodeKey
)
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
636 this.opts
.workerChoiceStrategyOptions
= {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
638 ...workerChoiceStrategyOptions
640 this.workerChoiceStrategyContext
.setOptions(
641 this.opts
.workerChoiceStrategyOptions
646 public enableTasksQueue (
648 tasksQueueOptions
?: TasksQueueOptions
650 if (this.opts
.enableTasksQueue
=== true && !enable
) {
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
653 this.flushTasksQueues()
655 this.opts
.enableTasksQueue
= enable
656 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
660 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
661 if (this.opts
.enableTasksQueue
=== true) {
662 this.checkValidTasksQueueOptions(tasksQueueOptions
)
663 this.opts
.tasksQueueOptions
=
664 this.buildTasksQueueOptions(tasksQueueOptions
)
665 this.setTasksQueueSize(this.opts
.tasksQueueOptions
.size
as number)
666 if (this.opts
.tasksQueueOptions
.taskStealing
=== true) {
667 this.setTaskStealing()
669 this.unsetTaskStealing()
671 if (this.opts
.tasksQueueOptions
.tasksStealingOnBackPressure
=== true) {
672 this.setTasksStealingOnBackPressure()
674 this.unsetTasksStealingOnBackPressure()
676 } else if (this.opts
.tasksQueueOptions
!= null) {
677 delete this.opts
.tasksQueueOptions
681 private buildTasksQueueOptions (
682 tasksQueueOptions
: TasksQueueOptions
683 ): TasksQueueOptions
{
686 size
: Math.pow(this.maxSize
, 2),
689 tasksStealingOnBackPressure
: true
695 private setTasksQueueSize (size
: number): void {
696 for (const workerNode
of this.workerNodes
) {
697 workerNode
.tasksQueueBackPressureSize
= size
701 private setTaskStealing (): void {
702 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
703 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
704 this.taskStealingOnEmptyQueue
.bind(this)
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
710 delete this.workerNodes
[workerNodeKey
].onEmptyQueue
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
716 this.workerNodes
[workerNodeKey
].onBackPressure
=
717 this.tasksStealingOnBackPressure
.bind(this)
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
723 delete this.workerNodes
[workerNodeKey
].onBackPressure
728 * Whether the pool is full or not.
730 * The pool filling boolean status.
732 protected get
full (): boolean {
733 return this.workerNodes
.length
>= this.maxSize
737 * Whether the pool is busy or not.
739 * The pool busyness boolean status.
741 protected abstract get
busy (): boolean
744 * Whether worker nodes are executing concurrently their tasks quota or not.
746 * @returns Worker nodes busyness boolean status.
748 protected internalBusy (): boolean {
749 if (this.opts
.enableTasksQueue
=== true) {
751 this.workerNodes
.findIndex(
753 workerNode
.info
.ready
&&
754 workerNode
.usage
.tasks
.executing
<
755 (this.opts
.tasksQueueOptions
?.concurrency
as number)
760 this.workerNodes
.findIndex(
762 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
767 private async sendTaskFunctionOperationToWorker (
768 workerNodeKey
: number,
769 message
: MessageValue
<Data
>
770 ): Promise
<boolean> {
771 return await new Promise
<boolean>((resolve
, reject
) => {
772 const workerId
= this.getWorkerInfo(workerNodeKey
).id
as number
773 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
775 message
.workerId
=== workerId
&&
776 message
.taskFunctionOperationStatus
=== true
780 message
.workerId
=== workerId
&&
781 message
.taskFunctionOperationStatus
=== false
785 `Task function operation '${
786 message.taskFunctionOperation as string
787 }' failed on worker ${message.workerId} with error: '${
788 message.workerError?.message as string
794 this.sendToWorker(workerNodeKey
, message
)
798 private async sendTaskFunctionOperationToWorkers (
799 message
: MessageValue
<Data
>
800 ): Promise
<boolean> {
801 return await new Promise
<boolean>((resolve
, reject
) => {
802 const responsesReceived
= new Array<MessageValue
<Data
| Response
>>()
803 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
804 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
805 if (message
.taskFunctionOperationStatus
!= null) {
806 responsesReceived
.push(message
)
808 responsesReceived
.length
=== this.workerNodes
.length
&&
809 responsesReceived
.every(
810 message
=> message
.taskFunctionOperationStatus
=== true
815 responsesReceived
.length
=== this.workerNodes
.length
&&
816 responsesReceived
.some(
817 message
=> message
.taskFunctionOperationStatus
=== false
820 const errorResponse
= responsesReceived
.find(
821 response
=> response
.taskFunctionOperationStatus
=== false
825 `Task function operation '${
826 message.taskFunctionOperation as string
827 }' failed on worker ${
828 errorResponse?.workerId as number
830 errorResponse?.workerError?.message as string
837 this.sendToWorker(workerNodeKey
, message
)
843 public hasTaskFunction (name
: string): boolean {
844 for (const workerNode
of this.workerNodes
) {
846 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
847 workerNode
.info
.taskFunctionNames
.includes(name
)
856 public async addTaskFunction (
858 fn
: TaskFunction
<Data
, Response
>
859 ): Promise
<boolean> {
860 if (typeof name
!== 'string') {
861 throw new TypeError('name argument must be a string')
863 if (typeof name
=== 'string' && name
.trim().length
=== 0) {
864 throw new TypeError('name argument must not be an empty string')
866 if (typeof fn
!== 'function') {
867 throw new TypeError('fn argument must be a function')
869 const opResult
= await this.sendTaskFunctionOperationToWorkers({
870 taskFunctionOperation
: 'add',
871 taskFunctionName
: name
,
872 taskFunction
: fn
.toString()
874 this.taskFunctions
.set(name
, fn
)
879 public async removeTaskFunction (name
: string): Promise
<boolean> {
880 if (!this.taskFunctions
.has(name
)) {
882 'Cannot remove a task function not handled on the pool side'
885 const opResult
= await this.sendTaskFunctionOperationToWorkers({
886 taskFunctionOperation
: 'remove',
887 taskFunctionName
: name
889 this.deleteTaskFunctionWorkerUsages(name
)
890 this.taskFunctions
.delete(name
)
895 public listTaskFunctionNames (): string[] {
896 for (const workerNode
of this.workerNodes
) {
898 Array.isArray(workerNode
.info
.taskFunctionNames
) &&
899 workerNode
.info
.taskFunctionNames
.length
> 0
901 return workerNode
.info
.taskFunctionNames
908 public async setDefaultTaskFunction (name
: string): Promise
<boolean> {
909 return await this.sendTaskFunctionOperationToWorkers({
910 taskFunctionOperation
: 'default',
911 taskFunctionName
: name
915 private deleteTaskFunctionWorkerUsages (name
: string): void {
916 for (const workerNode
of this.workerNodes
) {
917 workerNode
.deleteTaskFunctionWorkerUsage(name
)
921 private shallExecuteTask (workerNodeKey
: number): boolean {
923 this.tasksQueueSize(workerNodeKey
) === 0 &&
924 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
925 (this.opts
.tasksQueueOptions
?.concurrency
as number)
930 public async execute (
933 transferList
?: TransferListItem
[]
934 ): Promise
<Response
> {
935 return await new Promise
<Response
>((resolve
, reject
) => {
937 reject(new Error('Cannot execute a task on not started pool'))
940 if (name
!= null && typeof name
!== 'string') {
941 reject(new TypeError('name argument must be a string'))
946 typeof name
=== 'string' &&
947 name
.trim().length
=== 0
949 reject(new TypeError('name argument must not be an empty string'))
952 if (transferList
!= null && !Array.isArray(transferList
)) {
953 reject(new TypeError('transferList argument must be an array'))
956 const timestamp
= performance
.now()
957 const workerNodeKey
= this.chooseWorkerNode()
958 const task
: Task
<Data
> = {
959 name
: name
?? DEFAULT_TASK_NAME
,
960 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
961 data
: data
?? ({} as Data
),
966 this.promiseResponseMap
.set(task
.taskId
as string, {
972 this.opts
.enableTasksQueue
=== false ||
973 (this.opts
.enableTasksQueue
=== true &&
974 this.shallExecuteTask(workerNodeKey
))
976 this.executeTask(workerNodeKey
, task
)
978 this.enqueueTask(workerNodeKey
, task
)
984 public start (): void {
987 this.workerNodes
.reduce(
988 (accumulator
, workerNode
) =>
989 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
991 ) < this.numberOfWorkers
993 this.createAndSetupWorkerNode()
995 this.starting
= false
1000 public async destroy (): Promise
<void> {
1002 this.workerNodes
.map(async (_
, workerNodeKey
) => {
1003 await this.destroyWorkerNode(workerNodeKey
)
1006 this.emitter
?.emit(PoolEvents
.destroy
, this.info
)
1007 this.started
= false
1010 protected async sendKillMessageToWorker (
1011 workerNodeKey
: number
1013 await new Promise
<void>((resolve
, reject
) => {
1014 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1015 if (message
.kill
=== 'success') {
1017 } else if (message
.kill
=== 'failure') {
1021 message.workerId as number
1022 } kill message handling failed`
1027 this.sendToWorker(workerNodeKey
, { kill
: true })
1032 * Terminates the worker node given its worker node key.
1034 * @param workerNodeKey - The worker node key.
1036 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
1039 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1040 * Can be overridden.
1044 protected setupHook (): void {
1045 /* Intentionally empty */
1049 * Should return whether the worker is the main worker or not.
1051 protected abstract isMain (): boolean
1054 * Hook executed before the worker task execution.
1055 * Can be overridden.
1057 * @param workerNodeKey - The worker node key.
1058 * @param task - The task to execute.
1060 protected beforeTaskExecutionHook (
1061 workerNodeKey
: number,
1064 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1065 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1066 ++workerUsage
.tasks
.executing
1067 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
1070 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1071 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1075 const taskFunctionWorkerUsage
= this.workerNodes
[
1077 ].getTaskFunctionWorkerUsage(task
.name
as string) as WorkerUsage
1078 ++taskFunctionWorkerUsage
.tasks
.executing
1079 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage
, task
)
1084 * Hook executed after the worker task execution.
1085 * Can be overridden.
1087 * @param workerNodeKey - The worker node key.
1088 * @param message - The received message.
1090 protected afterTaskExecutionHook (
1091 workerNodeKey
: number,
1092 message
: MessageValue
<Response
>
1094 if (this.workerNodes
[workerNodeKey
]?.usage
!= null) {
1095 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
1096 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
1097 this.updateRunTimeWorkerUsage(workerUsage
, message
)
1098 this.updateEluWorkerUsage(workerUsage
, message
)
1101 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1102 this.workerNodes
[workerNodeKey
].getTaskFunctionWorkerUsage(
1103 message
.taskPerformance
?.name
as string
1106 const taskFunctionWorkerUsage
= this.workerNodes
[
1108 ].getTaskFunctionWorkerUsage(
1109 message
.taskPerformance
?.name
as string
1111 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage
, message
)
1112 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage
, message
)
1113 this.updateEluWorkerUsage(taskFunctionWorkerUsage
, message
)
1118 * Whether the worker node shall update its task function worker usage or not.
1120 * @param workerNodeKey - The worker node key.
1121 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1123 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey
: number): boolean {
1124 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1126 workerInfo
!= null &&
1127 Array.isArray(workerInfo
.taskFunctionNames
) &&
1128 workerInfo
.taskFunctionNames
.length
> 2
1132 private updateTaskStatisticsWorkerUsage (
1133 workerUsage
: WorkerUsage
,
1134 message
: MessageValue
<Response
>
1136 const workerTaskStatistics
= workerUsage
.tasks
1138 workerTaskStatistics
.executing
!= null &&
1139 workerTaskStatistics
.executing
> 0
1141 --workerTaskStatistics
.executing
1143 if (message
.workerError
== null) {
1144 ++workerTaskStatistics
.executed
1146 ++workerTaskStatistics
.failed
1150 private updateRunTimeWorkerUsage (
1151 workerUsage
: WorkerUsage
,
1152 message
: MessageValue
<Response
>
1154 if (message
.workerError
!= null) {
1157 updateMeasurementStatistics(
1158 workerUsage
.runTime
,
1159 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
1160 message
.taskPerformance
?.runTime
?? 0
1164 private updateWaitTimeWorkerUsage (
1165 workerUsage
: WorkerUsage
,
1168 const timestamp
= performance
.now()
1169 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
1170 updateMeasurementStatistics(
1171 workerUsage
.waitTime
,
1172 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
1177 private updateEluWorkerUsage (
1178 workerUsage
: WorkerUsage
,
1179 message
: MessageValue
<Response
>
1181 if (message
.workerError
!= null) {
1184 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
1185 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
1186 updateMeasurementStatistics(
1187 workerUsage
.elu
.active
,
1188 eluTaskStatisticsRequirements
,
1189 message
.taskPerformance
?.elu
?.active
?? 0
1191 updateMeasurementStatistics(
1192 workerUsage
.elu
.idle
,
1193 eluTaskStatisticsRequirements
,
1194 message
.taskPerformance
?.elu
?.idle
?? 0
1196 if (eluTaskStatisticsRequirements
.aggregate
) {
1197 if (message
.taskPerformance
?.elu
!= null) {
1198 if (workerUsage
.elu
.utilization
!= null) {
1199 workerUsage
.elu
.utilization
=
1200 (workerUsage
.elu
.utilization
+
1201 message
.taskPerformance
.elu
.utilization
) /
1204 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
1211 * Chooses a worker node for the next task.
1213 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1215 * @returns The chosen worker node key
1217 private chooseWorkerNode (): number {
1218 if (this.shallCreateDynamicWorker()) {
1219 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
1221 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1223 return workerNodeKey
1226 return this.workerChoiceStrategyContext
.execute()
1230 * Conditions for dynamic worker creation.
1232 * @returns Whether to create a dynamic worker or not.
1234 private shallCreateDynamicWorker (): boolean {
1235 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
1239 * Sends a message to worker given its worker node key.
1241 * @param workerNodeKey - The worker node key.
1242 * @param message - The message.
1243 * @param transferList - The optional array of transferable objects.
1245 protected abstract sendToWorker (
1246 workerNodeKey
: number,
1247 message
: MessageValue
<Data
>,
1248 transferList
?: TransferListItem
[]
1252 * Creates a new worker.
1254 * @returns Newly created worker.
1256 protected abstract createWorker (): Worker
1259 * Creates a new, completely set up worker node.
1261 * @returns New, completely set up worker node key.
1263 protected createAndSetupWorkerNode (): number {
1264 const worker
= this.createWorker()
1266 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
1267 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
1268 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
1269 worker
.on('error', error
=> {
1270 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1271 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1272 workerInfo
.ready
= false
1273 this.workerNodes
[workerNodeKey
].closeChannel()
1274 this.emitter
?.emit(PoolEvents
.error
, error
)
1278 this.opts
.restartWorkerOnError
=== true
1280 if (workerInfo
.dynamic
) {
1281 this.createAndSetupDynamicWorkerNode()
1283 this.createAndSetupWorkerNode()
1286 if (this.started
&& this.opts
.enableTasksQueue
=== true) {
1287 this.redistributeQueuedTasks(workerNodeKey
)
1290 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
1291 worker
.once('exit', () => {
1292 this.removeWorkerNode(worker
)
1295 const workerNodeKey
= this.addWorkerNode(worker
)
1297 this.afterWorkerNodeSetup(workerNodeKey
)
1299 return workerNodeKey
1303 * Creates a new, completely set up dynamic worker node.
1305 * @returns New, completely set up dynamic worker node key.
1307 protected createAndSetupDynamicWorkerNode (): number {
1308 const workerNodeKey
= this.createAndSetupWorkerNode()
1309 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
1310 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
1313 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
1314 // Kill message received from worker
1316 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
1317 (isKillBehavior(KillBehaviors
.SOFT
, message
.kill
) &&
1318 ((this.opts
.enableTasksQueue
=== false &&
1319 workerUsage
.tasks
.executing
=== 0) ||
1320 (this.opts
.enableTasksQueue
=== true &&
1321 workerUsage
.tasks
.executing
=== 0 &&
1322 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
1324 this.destroyWorkerNode(localWorkerNodeKey
).catch(error
=> {
1325 this.emitter
?.emit(PoolEvents
.error
, error
)
1329 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
1330 this.sendToWorker(workerNodeKey
, {
1333 if (this.taskFunctions
.size
> 0) {
1334 for (const [taskFunctionName
, taskFunction
] of this.taskFunctions
) {
1335 this.sendTaskFunctionOperationToWorker(workerNodeKey
, {
1336 taskFunctionOperation
: 'add',
1338 taskFunction
: taskFunction
.toString()
1340 this.emitter
?.emit(PoolEvents
.error
, error
)
1344 workerInfo
.dynamic
= true
1346 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerReady
||
1347 this.workerChoiceStrategyContext
.getStrategyPolicy().dynamicWorkerUsage
1349 workerInfo
.ready
= true
1351 this.checkAndEmitDynamicWorkerCreationEvents()
1352 return workerNodeKey
1356 * Registers a listener callback on the worker given its worker node key.
1358 * @param workerNodeKey - The worker node key.
1359 * @param listener - The message listener callback.
1361 protected abstract registerWorkerMessageListener
<
1362 Message
extends Data
| Response
1364 workerNodeKey
: number,
1365 listener
: (message
: MessageValue
<Message
>) => void
1369 * Method hooked up after a worker node has been newly created.
1370 * Can be overridden.
1372 * @param workerNodeKey - The newly created worker node key.
1374 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
1375 // Listen to worker messages.
1376 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
1377 // Send the startup message to worker.
1378 this.sendStartupMessageToWorker(workerNodeKey
)
1379 // Send the statistics message to worker.
1380 this.sendStatisticsMessageToWorker(workerNodeKey
)
1381 if (this.opts
.enableTasksQueue
=== true) {
1382 if (this.opts
.tasksQueueOptions
?.taskStealing
=== true) {
1383 this.workerNodes
[workerNodeKey
].onEmptyQueue
=
1384 this.taskStealingOnEmptyQueue
.bind(this)
1386 if (this.opts
.tasksQueueOptions
?.tasksStealingOnBackPressure
=== true) {
1387 this.workerNodes
[workerNodeKey
].onBackPressure
=
1388 this.tasksStealingOnBackPressure
.bind(this)
1394 * Sends the startup message to worker given its worker node key.
1396 * @param workerNodeKey - The worker node key.
1398 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
1401 * Sends the statistics message to worker given its worker node key.
1403 * @param workerNodeKey - The worker node key.
1405 private sendStatisticsMessageToWorker (workerNodeKey
: number): void {
1406 this.sendToWorker(workerNodeKey
, {
1409 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1411 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1417 private redistributeQueuedTasks (workerNodeKey
: number): void {
1418 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1419 const destinationWorkerNodeKey
= this.workerNodes
.reduce(
1420 (minWorkerNodeKey
, workerNode
, workerNodeKey
, workerNodes
) => {
1421 return workerNode
.info
.ready
&&
1422 workerNode
.usage
.tasks
.queued
<
1423 workerNodes
[minWorkerNodeKey
].usage
.tasks
.queued
1429 const task
= this.dequeueTask(workerNodeKey
) as Task
<Data
>
1430 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1431 this.executeTask(destinationWorkerNodeKey
, task
)
1433 this.enqueueTask(destinationWorkerNodeKey
, task
)
1438 private updateTaskStolenStatisticsWorkerUsage (
1439 workerNodeKey
: number,
1442 const workerNode
= this.workerNodes
[workerNodeKey
]
1443 if (workerNode
?.usage
!= null) {
1444 ++workerNode
.usage
.tasks
.stolen
1447 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey
) &&
1448 workerNode
.getTaskFunctionWorkerUsage(taskName
) != null
1450 const taskFunctionWorkerUsage
= workerNode
.getTaskFunctionWorkerUsage(
1453 ++taskFunctionWorkerUsage
.tasks
.stolen
1457 private taskStealingOnEmptyQueue (workerId
: number): void {
1458 const destinationWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(workerId
)
1459 const workerNodes
= this.workerNodes
1462 (workerNodeA
, workerNodeB
) =>
1463 workerNodeB
.usage
.tasks
.queued
- workerNodeA
.usage
.tasks
.queued
1465 const sourceWorkerNode
= workerNodes
.find(
1467 workerNode
.info
.ready
&&
1468 workerNode
.info
.id
!== workerId
&&
1469 workerNode
.usage
.tasks
.queued
> 0
1471 if (sourceWorkerNode
!= null) {
1472 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1473 if (this.shallExecuteTask(destinationWorkerNodeKey
)) {
1474 this.executeTask(destinationWorkerNodeKey
, task
)
1476 this.enqueueTask(destinationWorkerNodeKey
, task
)
1478 this.updateTaskStolenStatisticsWorkerUsage(
1479 destinationWorkerNodeKey
,
1485 private tasksStealingOnBackPressure (workerId
: number): void {
1486 const sizeOffset
= 1
1487 if ((this.opts
.tasksQueueOptions
?.size
as number) <= sizeOffset
) {
1490 const sourceWorkerNode
=
1491 this.workerNodes
[this.getWorkerNodeKeyByWorkerId(workerId
)]
1492 const workerNodes
= this.workerNodes
1495 (workerNodeA
, workerNodeB
) =>
1496 workerNodeA
.usage
.tasks
.queued
- workerNodeB
.usage
.tasks
.queued
1498 for (const [workerNodeKey
, workerNode
] of workerNodes
.entries()) {
1500 sourceWorkerNode
.usage
.tasks
.queued
> 0 &&
1501 workerNode
.info
.ready
&&
1502 workerNode
.info
.id
!== workerId
&&
1503 workerNode
.usage
.tasks
.queued
<
1504 (this.opts
.tasksQueueOptions
?.size
as number) - sizeOffset
1506 const task
= sourceWorkerNode
.popTask() as Task
<Data
>
1507 if (this.shallExecuteTask(workerNodeKey
)) {
1508 this.executeTask(workerNodeKey
, task
)
1510 this.enqueueTask(workerNodeKey
, task
)
1512 this.updateTaskStolenStatisticsWorkerUsage(
1521 * This method is the listener registered for each worker message.
1523 * @returns The listener function to execute when a message is received from a worker.
1525 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1527 this.checkMessageWorkerId(message
)
1528 if (message
.ready
!= null && message
.taskFunctionNames
!= null) {
1529 // Worker ready response received from worker
1530 this.handleWorkerReadyResponse(message
)
1531 } else if (message
.taskId
!= null) {
1532 // Task execution response received from worker
1533 this.handleTaskExecutionResponse(message
)
1534 } else if (message
.taskFunctionNames
!= null) {
1535 // Task function names message received from worker
1537 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1538 ).taskFunctionNames
= message
.taskFunctionNames
1543 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1544 if (message
.ready
=== false) {
1546 `Worker ${message.workerId as number} failed to initialize`
1549 const workerInfo
= this.getWorkerInfo(
1550 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1552 workerInfo
.ready
= message
.ready
as boolean
1553 workerInfo
.taskFunctionNames
= message
.taskFunctionNames
1555 this.emitter
?.emit(PoolEvents
.ready
, this.info
)
1559 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1560 const { taskId
, workerError
, data
} = message
1561 const promiseResponse
= this.promiseResponseMap
.get(taskId
as string)
1562 if (promiseResponse
!= null) {
1563 if (workerError
!= null) {
1564 this.emitter
?.emit(PoolEvents
.taskError
, workerError
)
1565 promiseResponse
.reject(workerError
.message
)
1567 promiseResponse
.resolve(data
as Response
)
1569 const workerNodeKey
= promiseResponse
.workerNodeKey
1570 this.afterTaskExecutionHook(workerNodeKey
, message
)
1571 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1572 this.promiseResponseMap
.delete(taskId
as string)
1574 this.opts
.enableTasksQueue
=== true &&
1575 this.tasksQueueSize(workerNodeKey
) > 0 &&
1576 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1577 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1581 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1587 private checkAndEmitTaskExecutionEvents (): void {
1589 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
1593 private checkAndEmitTaskQueuingEvents (): void {
1594 if (this.hasBackPressure()) {
1595 this.emitter
?.emit(PoolEvents
.backPressure
, this.info
)
1599 private checkAndEmitDynamicWorkerCreationEvents (): void {
1600 if (this.type === PoolTypes
.dynamic
) {
1602 this.emitter
?.emit(PoolEvents
.full
, this.info
)
1608 * Gets the worker information given its worker node key.
1610 * @param workerNodeKey - The worker node key.
1611 * @returns The worker information.
1613 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1614 return this.workerNodes
[workerNodeKey
].info
1618 * Adds the given worker in the pool worker nodes.
1620 * @param worker - The worker.
1621 * @returns The added worker node key.
1622 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1624 private addWorkerNode (worker
: Worker
): number {
1625 const workerNode
= new WorkerNode
<Worker
, Data
>(
1627 this.opts
.tasksQueueOptions
?.size
?? Math.pow(this.maxSize
, 2)
1629 // Flag the worker node as ready at pool startup.
1630 if (this.starting
) {
1631 workerNode
.info
.ready
= true
1633 this.workerNodes
.push(workerNode
)
1634 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1635 if (workerNodeKey
=== -1) {
1636 throw new Error('Worker added not found in worker nodes')
1638 return workerNodeKey
1642 * Removes the given worker from the pool worker nodes.
1644 * @param worker - The worker.
1646 private removeWorkerNode (worker
: Worker
): void {
1647 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1648 if (workerNodeKey
!== -1) {
1649 this.workerNodes
.splice(workerNodeKey
, 1)
1650 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1655 public hasWorkerNodeBackPressure (workerNodeKey
: number): boolean {
1657 this.opts
.enableTasksQueue
=== true &&
1658 this.workerNodes
[workerNodeKey
].hasBackPressure()
1662 private hasBackPressure (): boolean {
1664 this.opts
.enableTasksQueue
=== true &&
1665 this.workerNodes
.findIndex(
1666 workerNode
=> !workerNode
.hasBackPressure()
1672 * Executes the given task on the worker given its worker node key.
1674 * @param workerNodeKey - The worker node key.
1675 * @param task - The task to execute.
1677 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1678 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1679 this.sendToWorker(workerNodeKey
, task
, task
.transferList
)
1680 this.checkAndEmitTaskExecutionEvents()
1683 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1684 const tasksQueueSize
= this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1685 this.checkAndEmitTaskQueuingEvents()
1686 return tasksQueueSize
1689 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1690 return this.workerNodes
[workerNodeKey
].dequeueTask()
1693 private tasksQueueSize (workerNodeKey
: number): number {
1694 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1697 protected flushTasksQueue (workerNodeKey
: number): void {
1698 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1701 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1704 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1707 private flushTasksQueues (): void {
1708 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1709 this.flushTasksQueue(workerNodeKey
)