feat: use SMA and SMM for worker tasks usage
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task,
9 Writable
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 average,
16 isKillBehavior,
17 isPlainObject,
18 median,
19 round,
20 updateMeasurementStatistics
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50
51 /**
52 * Base class that implements some shared logic for all poolifier pools.
53 *
54 * @typeParam Worker - Type of worker which manages this pool.
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
57 */
58 export abstract class AbstractPool<
59 Worker extends IWorker,
60 Data = unknown,
61 Response = unknown
62 > implements IPool<Worker, Data, Response> {
63 /** @inheritDoc */
64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
65
66 /** @inheritDoc */
67 public readonly emitter?: PoolEmitter
68
69 /**
70 * The task execution response promise map.
71 *
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is starting or not.
96 */
97 private readonly starting: boolean
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.starting = true
148 this.startPool()
149 this.starting = false
150 this.started = true
151
152 this.startTimestamp = performance.now()
153 }
154
155 private checkFilePath (filePath: string): void {
156 if (
157 filePath == null ||
158 typeof filePath !== 'string' ||
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
161 throw new Error('Please specify a file with a worker implementation')
162 }
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
166 }
167
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
174 throw new TypeError(
175 'Cannot instantiate a pool with a non safe integer number of workers'
176 )
177 } else if (numberOfWorkers < 0) {
178 throw new RangeError(
179 'Cannot instantiate a pool with a negative number of workers'
180 )
181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
187 if (this.type === PoolTypes.dynamic) {
188 if (max == null) {
189 throw new TypeError(
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
200 } else if (max === 0) {
201 throw new RangeError(
202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
244 throw new Error(
245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
246 )
247 }
248 }
249
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
274 if (
275 workerChoiceStrategyOptions.weights != null &&
276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
292 }
293
294 private checkValidTasksQueueOptions (
295 tasksQueueOptions: Writable<TasksQueueOptions>
296 ): void {
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
305 'Invalid worker node tasks concurrency: must be an integer'
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
312 throw new RangeError(
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
318 tasksQueueOptions?.size != null
319 ) {
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
322 )
323 }
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
327 if (
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
330 ) {
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
336 throw new RangeError(
337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
338 )
339 }
340 }
341
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
350 this.createAndSetupWorkerNode()
351 }
352 }
353
354 /** @inheritDoc */
355 public get info (): PoolInfo {
356 return {
357 version,
358 type: this.type,
359 worker: this.worker,
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
362 minSize: this.minSize,
363 maxSize: this.maxSize,
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 workerNode.usage.tasks.executing === 0
372 ? accumulator + 1
373 : accumulator,
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
379 0
380 ),
381 executedTasks: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + workerNode.usage.tasks.executed,
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.executing,
389 0
390 ),
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + workerNode.usage.tasks.failed,
411 0
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
420 )
421 )
422 ),
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
427 )
428 )
429 ),
430 average: round(
431 average(
432 this.workerNodes.reduce<number[]>(
433 (accumulator, workerNode) =>
434 accumulator.concat(workerNode.usage.runTime.history),
435 []
436 )
437 )
438 ),
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .runTime.median && {
441 median: round(
442 median(
443 this.workerNodes.map(
444 (workerNode) => workerNode.usage.runTime?.median ?? 0
445 )
446 )
447 )
448 })
449 }
450 }),
451 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
452 .waitTime.aggregate && {
453 waitTime: {
454 minimum: round(
455 Math.min(
456 ...this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
458 )
459 )
460 ),
461 maximum: round(
462 Math.max(
463 ...this.workerNodes.map(
464 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
465 )
466 )
467 ),
468 average: round(
469 average(
470 this.workerNodes.reduce<number[]>(
471 (accumulator, workerNode) =>
472 accumulator.concat(workerNode.usage.waitTime.history),
473 []
474 )
475 )
476 ),
477 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
478 .waitTime.median && {
479 median: round(
480 median(
481 this.workerNodes.map(
482 (workerNode) => workerNode.usage.waitTime?.median ?? 0
483 )
484 )
485 )
486 })
487 }
488 })
489 }
490 }
491
492 /**
493 * The pool readiness boolean status.
494 */
495 private get ready (): boolean {
496 return (
497 this.workerNodes.reduce(
498 (accumulator, workerNode) =>
499 !workerNode.info.dynamic && workerNode.info.ready
500 ? accumulator + 1
501 : accumulator,
502 0
503 ) >= this.minSize
504 )
505 }
506
507 /**
508 * The approximate pool utilization.
509 *
510 * @returns The pool utilization.
511 */
512 private get utilization (): number {
513 const poolTimeCapacity =
514 (performance.now() - this.startTimestamp) * this.maxSize
515 const totalTasksRunTime = this.workerNodes.reduce(
516 (accumulator, workerNode) =>
517 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
518 0
519 )
520 const totalTasksWaitTime = this.workerNodes.reduce(
521 (accumulator, workerNode) =>
522 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
523 0
524 )
525 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
526 }
527
528 /**
529 * The pool type.
530 *
531 * If it is `'dynamic'`, it provides the `max` property.
532 */
533 protected abstract get type (): PoolType
534
535 /**
536 * The worker type.
537 */
538 protected abstract get worker (): WorkerType
539
540 /**
541 * The pool minimum size.
542 */
543 protected get minSize (): number {
544 return this.numberOfWorkers
545 }
546
547 /**
548 * The pool maximum size.
549 */
550 protected get maxSize (): number {
551 return this.max ?? this.numberOfWorkers
552 }
553
554 /**
555 * Checks if the worker id sent in the received message from a worker is valid.
556 *
557 * @param message - The received message.
558 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
559 */
560 private checkMessageWorkerId (message: MessageValue<Response>): void {
561 if (message.workerId == null) {
562 throw new Error('Worker message received without worker id')
563 } else if (
564 message.workerId != null &&
565 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
566 ) {
567 throw new Error(
568 `Worker message received from unknown worker '${message.workerId}'`
569 )
570 }
571 }
572
573 /**
574 * Gets the given worker its worker node key.
575 *
576 * @param worker - The worker.
577 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
578 */
579 private getWorkerNodeKeyByWorker (worker: Worker): number {
580 return this.workerNodes.findIndex(
581 (workerNode) => workerNode.worker === worker
582 )
583 }
584
585 /**
586 * Gets the worker node key given its worker id.
587 *
588 * @param workerId - The worker id.
589 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
590 */
591 private getWorkerNodeKeyByWorkerId (workerId: number): number {
592 return this.workerNodes.findIndex(
593 (workerNode) => workerNode.info.id === workerId
594 )
595 }
596
597 /** @inheritDoc */
598 public setWorkerChoiceStrategy (
599 workerChoiceStrategy: WorkerChoiceStrategy,
600 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
601 ): void {
602 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
603 this.opts.workerChoiceStrategy = workerChoiceStrategy
604 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
605 this.opts.workerChoiceStrategy
606 )
607 if (workerChoiceStrategyOptions != null) {
608 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
609 }
610 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
611 workerNode.resetUsage()
612 this.sendStatisticsMessageToWorker(workerNodeKey)
613 }
614 }
615
616 /** @inheritDoc */
617 public setWorkerChoiceStrategyOptions (
618 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
619 ): void {
620 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
621 this.opts.workerChoiceStrategyOptions = {
622 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
623 ...workerChoiceStrategyOptions
624 }
625 this.workerChoiceStrategyContext.setOptions(
626 this.opts.workerChoiceStrategyOptions
627 )
628 }
629
630 /** @inheritDoc */
631 public enableTasksQueue (
632 enable: boolean,
633 tasksQueueOptions?: TasksQueueOptions
634 ): void {
635 if (this.opts.enableTasksQueue === true && !enable) {
636 this.flushTasksQueues()
637 }
638 this.opts.enableTasksQueue = enable
639 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
640 }
641
642 /** @inheritDoc */
643 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
644 if (this.opts.enableTasksQueue === true) {
645 this.checkValidTasksQueueOptions(tasksQueueOptions)
646 this.opts.tasksQueueOptions =
647 this.buildTasksQueueOptions(tasksQueueOptions)
648 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
649 } else if (this.opts.tasksQueueOptions != null) {
650 delete this.opts.tasksQueueOptions
651 }
652 }
653
654 private setTasksQueueMaxSize (size: number): void {
655 for (const workerNode of this.workerNodes) {
656 workerNode.tasksQueueBackPressureSize = size
657 }
658 }
659
660 private buildTasksQueueOptions (
661 tasksQueueOptions: TasksQueueOptions
662 ): TasksQueueOptions {
663 return {
664 ...{
665 size: Math.pow(this.maxSize, 2),
666 concurrency: 1
667 },
668 ...tasksQueueOptions
669 }
670 }
671
672 /**
673 * Whether the pool is full or not.
674 *
675 * The pool filling boolean status.
676 */
677 protected get full (): boolean {
678 return this.workerNodes.length >= this.maxSize
679 }
680
681 /**
682 * Whether the pool is busy or not.
683 *
684 * The pool busyness boolean status.
685 */
686 protected abstract get busy (): boolean
687
688 /**
689 * Whether worker nodes are executing concurrently their tasks quota or not.
690 *
691 * @returns Worker nodes busyness boolean status.
692 */
693 protected internalBusy (): boolean {
694 if (this.opts.enableTasksQueue === true) {
695 return (
696 this.workerNodes.findIndex(
697 (workerNode) =>
698 workerNode.info.ready &&
699 workerNode.usage.tasks.executing <
700 (this.opts.tasksQueueOptions?.concurrency as number)
701 ) === -1
702 )
703 } else {
704 return (
705 this.workerNodes.findIndex(
706 (workerNode) =>
707 workerNode.info.ready && workerNode.usage.tasks.executing === 0
708 ) === -1
709 )
710 }
711 }
712
713 /** @inheritDoc */
714 public listTaskFunctions (): string[] {
715 for (const workerNode of this.workerNodes) {
716 if (
717 Array.isArray(workerNode.info.taskFunctions) &&
718 workerNode.info.taskFunctions.length > 0
719 ) {
720 return workerNode.info.taskFunctions
721 }
722 }
723 return []
724 }
725
726 /** @inheritDoc */
727 public async execute (
728 data?: Data,
729 name?: string,
730 transferList?: TransferListItem[]
731 ): Promise<Response> {
732 return await new Promise<Response>((resolve, reject) => {
733 if (!this.started) {
734 reject(new Error('Cannot execute a task on destroyed pool'))
735 }
736 if (name != null && typeof name !== 'string') {
737 reject(new TypeError('name argument must be a string'))
738 }
739 if (
740 name != null &&
741 typeof name === 'string' &&
742 name.trim().length === 0
743 ) {
744 reject(new TypeError('name argument must not be an empty string'))
745 }
746 if (transferList != null && !Array.isArray(transferList)) {
747 reject(new TypeError('transferList argument must be an array'))
748 }
749 const timestamp = performance.now()
750 const workerNodeKey = this.chooseWorkerNode()
751 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
752 if (
753 name != null &&
754 Array.isArray(workerInfo.taskFunctions) &&
755 !workerInfo.taskFunctions.includes(name)
756 ) {
757 reject(
758 new Error(`Task function '${name}' is not registered in the pool`)
759 )
760 }
761 const task: Task<Data> = {
762 name: name ?? DEFAULT_TASK_NAME,
763 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
764 data: data ?? ({} as Data),
765 transferList,
766 timestamp,
767 workerId: workerInfo.id as number,
768 taskId: randomUUID()
769 }
770 this.promiseResponseMap.set(task.taskId as string, {
771 resolve,
772 reject,
773 workerNodeKey
774 })
775 if (
776 this.opts.enableTasksQueue === false ||
777 (this.opts.enableTasksQueue === true &&
778 this.workerNodes[workerNodeKey].usage.tasks.executing <
779 (this.opts.tasksQueueOptions?.concurrency as number))
780 ) {
781 this.executeTask(workerNodeKey, task)
782 } else {
783 this.enqueueTask(workerNodeKey, task)
784 }
785 })
786 }
787
788 /** @inheritDoc */
789 public async destroy (): Promise<void> {
790 await Promise.all(
791 this.workerNodes.map(async (_, workerNodeKey) => {
792 await this.destroyWorkerNode(workerNodeKey)
793 })
794 )
795 this.emitter?.emit(PoolEvents.destroy, this.info)
796 this.started = false
797 }
798
799 protected async sendKillMessageToWorker (
800 workerNodeKey: number,
801 workerId: number
802 ): Promise<void> {
803 await new Promise<void>((resolve, reject) => {
804 this.registerWorkerMessageListener(workerNodeKey, (message) => {
805 if (message.kill === 'success') {
806 resolve()
807 } else if (message.kill === 'failure') {
808 reject(new Error(`Worker ${workerId} kill message handling failed`))
809 }
810 })
811 this.sendToWorker(workerNodeKey, { kill: true, workerId })
812 })
813 }
814
815 /**
816 * Terminates the worker node given its worker node key.
817 *
818 * @param workerNodeKey - The worker node key.
819 */
820 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
821
822 /**
823 * Setup hook to execute code before worker nodes are created in the abstract constructor.
824 * Can be overridden.
825 *
826 * @virtual
827 */
828 protected setupHook (): void {
829 // Intentionally empty
830 }
831
832 /**
833 * Should return whether the worker is the main worker or not.
834 */
835 protected abstract isMain (): boolean
836
837 /**
838 * Hook executed before the worker task execution.
839 * Can be overridden.
840 *
841 * @param workerNodeKey - The worker node key.
842 * @param task - The task to execute.
843 */
844 protected beforeTaskExecutionHook (
845 workerNodeKey: number,
846 task: Task<Data>
847 ): void {
848 if (this.workerNodes[workerNodeKey]?.usage != null) {
849 const workerUsage = this.workerNodes[workerNodeKey].usage
850 ++workerUsage.tasks.executing
851 this.updateWaitTimeWorkerUsage(workerUsage, task)
852 }
853 if (
854 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
855 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
856 task.name as string
857 ) != null
858 ) {
859 const taskFunctionWorkerUsage = this.workerNodes[
860 workerNodeKey
861 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
862 ++taskFunctionWorkerUsage.tasks.executing
863 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
864 }
865 }
866
867 /**
868 * Hook executed after the worker task execution.
869 * Can be overridden.
870 *
871 * @param workerNodeKey - The worker node key.
872 * @param message - The received message.
873 */
874 protected afterTaskExecutionHook (
875 workerNodeKey: number,
876 message: MessageValue<Response>
877 ): void {
878 if (this.workerNodes[workerNodeKey]?.usage != null) {
879 const workerUsage = this.workerNodes[workerNodeKey].usage
880 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
881 this.updateRunTimeWorkerUsage(workerUsage, message)
882 this.updateEluWorkerUsage(workerUsage, message)
883 }
884 if (
885 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
886 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
887 message.taskPerformance?.name as string
888 ) != null
889 ) {
890 const taskFunctionWorkerUsage = this.workerNodes[
891 workerNodeKey
892 ].getTaskFunctionWorkerUsage(
893 message.taskPerformance?.name as string
894 ) as WorkerUsage
895 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
896 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
897 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
898 }
899 }
900
901 /**
902 * Whether the worker node shall update its task function worker usage or not.
903 *
904 * @param workerNodeKey - The worker node key.
905 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
906 */
907 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
908 const workerInfo = this.getWorkerInfo(workerNodeKey)
909 return (
910 workerInfo != null &&
911 Array.isArray(workerInfo.taskFunctions) &&
912 workerInfo.taskFunctions.length > 2
913 )
914 }
915
916 private updateTaskStatisticsWorkerUsage (
917 workerUsage: WorkerUsage,
918 message: MessageValue<Response>
919 ): void {
920 const workerTaskStatistics = workerUsage.tasks
921 if (
922 workerTaskStatistics.executing != null &&
923 workerTaskStatistics.executing > 0
924 ) {
925 --workerTaskStatistics.executing
926 }
927 if (message.taskError == null) {
928 ++workerTaskStatistics.executed
929 } else {
930 ++workerTaskStatistics.failed
931 }
932 }
933
934 private updateRunTimeWorkerUsage (
935 workerUsage: WorkerUsage,
936 message: MessageValue<Response>
937 ): void {
938 if (message.taskError != null) {
939 return
940 }
941 updateMeasurementStatistics(
942 workerUsage.runTime,
943 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
944 message.taskPerformance?.runTime ?? 0
945 )
946 }
947
948 private updateWaitTimeWorkerUsage (
949 workerUsage: WorkerUsage,
950 task: Task<Data>
951 ): void {
952 const timestamp = performance.now()
953 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
954 updateMeasurementStatistics(
955 workerUsage.waitTime,
956 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
957 taskWaitTime
958 )
959 }
960
961 private updateEluWorkerUsage (
962 workerUsage: WorkerUsage,
963 message: MessageValue<Response>
964 ): void {
965 if (message.taskError != null) {
966 return
967 }
968 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
969 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
970 updateMeasurementStatistics(
971 workerUsage.elu.active,
972 eluTaskStatisticsRequirements,
973 message.taskPerformance?.elu?.active ?? 0
974 )
975 updateMeasurementStatistics(
976 workerUsage.elu.idle,
977 eluTaskStatisticsRequirements,
978 message.taskPerformance?.elu?.idle ?? 0
979 )
980 if (eluTaskStatisticsRequirements.aggregate) {
981 if (message.taskPerformance?.elu != null) {
982 if (workerUsage.elu.utilization != null) {
983 workerUsage.elu.utilization =
984 (workerUsage.elu.utilization +
985 message.taskPerformance.elu.utilization) /
986 2
987 } else {
988 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
989 }
990 }
991 }
992 }
993
994 /**
995 * Chooses a worker node for the next task.
996 *
997 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
998 *
999 * @returns The chosen worker node key
1000 */
1001 private chooseWorkerNode (): number {
1002 if (this.shallCreateDynamicWorker()) {
1003 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1004 if (
1005 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1006 ) {
1007 return workerNodeKey
1008 }
1009 }
1010 return this.workerChoiceStrategyContext.execute()
1011 }
1012
1013 /**
1014 * Conditions for dynamic worker creation.
1015 *
1016 * @returns Whether to create a dynamic worker or not.
1017 */
1018 private shallCreateDynamicWorker (): boolean {
1019 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1020 }
1021
1022 /**
1023 * Sends a message to worker given its worker node key.
1024 *
1025 * @param workerNodeKey - The worker node key.
1026 * @param message - The message.
1027 * @param transferList - The optional array of transferable objects.
1028 */
1029 protected abstract sendToWorker (
1030 workerNodeKey: number,
1031 message: MessageValue<Data>,
1032 transferList?: TransferListItem[]
1033 ): void
1034
1035 /**
1036 * Creates a new worker.
1037 *
1038 * @returns Newly created worker.
1039 */
1040 protected abstract createWorker (): Worker
1041
1042 /**
1043 * Creates a new, completely set up worker node.
1044 *
1045 * @returns New, completely set up worker node key.
1046 */
1047 protected createAndSetupWorkerNode (): number {
1048 const worker = this.createWorker()
1049
1050 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1051 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1052 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1053 worker.on('error', (error) => {
1054 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1055 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1056 workerInfo.ready = false
1057 this.workerNodes[workerNodeKey].closeChannel()
1058 this.emitter?.emit(PoolEvents.error, error)
1059 if (
1060 this.opts.restartWorkerOnError === true &&
1061 !this.starting &&
1062 this.started
1063 ) {
1064 if (workerInfo.dynamic) {
1065 this.createAndSetupDynamicWorkerNode()
1066 } else {
1067 this.createAndSetupWorkerNode()
1068 }
1069 }
1070 if (this.opts.enableTasksQueue === true) {
1071 this.redistributeQueuedTasks(workerNodeKey)
1072 }
1073 })
1074 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1075 worker.once('exit', () => {
1076 this.removeWorkerNode(worker)
1077 })
1078
1079 const workerNodeKey = this.addWorkerNode(worker)
1080
1081 this.afterWorkerNodeSetup(workerNodeKey)
1082
1083 return workerNodeKey
1084 }
1085
1086 /**
1087 * Creates a new, completely set up dynamic worker node.
1088 *
1089 * @returns New, completely set up dynamic worker node key.
1090 */
1091 protected createAndSetupDynamicWorkerNode (): number {
1092 const workerNodeKey = this.createAndSetupWorkerNode()
1093 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1094 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1095 message.workerId
1096 )
1097 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1098 // Kill message received from worker
1099 if (
1100 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1101 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1102 ((this.opts.enableTasksQueue === false &&
1103 workerUsage.tasks.executing === 0) ||
1104 (this.opts.enableTasksQueue === true &&
1105 workerUsage.tasks.executing === 0 &&
1106 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1107 ) {
1108 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1109 this.emitter?.emit(PoolEvents.error, error)
1110 })
1111 }
1112 })
1113 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1114 this.sendToWorker(workerNodeKey, {
1115 checkActive: true,
1116 workerId: workerInfo.id as number
1117 })
1118 workerInfo.dynamic = true
1119 if (
1120 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1121 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1122 ) {
1123 workerInfo.ready = true
1124 }
1125 this.checkAndEmitDynamicWorkerCreationEvents()
1126 return workerNodeKey
1127 }
1128
1129 /**
1130 * Registers a listener callback on the worker given its worker node key.
1131 *
1132 * @param workerNodeKey - The worker node key.
1133 * @param listener - The message listener callback.
1134 */
1135 protected abstract registerWorkerMessageListener<
1136 Message extends Data | Response
1137 >(
1138 workerNodeKey: number,
1139 listener: (message: MessageValue<Message>) => void
1140 ): void
1141
1142 /**
1143 * Method hooked up after a worker node has been newly created.
1144 * Can be overridden.
1145 *
1146 * @param workerNodeKey - The newly created worker node key.
1147 */
1148 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1149 // Listen to worker messages.
1150 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1151 // Send the startup message to worker.
1152 this.sendStartupMessageToWorker(workerNodeKey)
1153 // Send the statistics message to worker.
1154 this.sendStatisticsMessageToWorker(workerNodeKey)
1155 if (this.opts.enableTasksQueue === true) {
1156 this.workerNodes[workerNodeKey].onEmptyQueue =
1157 this.taskStealingOnEmptyQueue.bind(this)
1158 this.workerNodes[workerNodeKey].onBackPressure =
1159 this.tasksStealingOnBackPressure.bind(this)
1160 }
1161 }
1162
1163 /**
1164 * Sends the startup message to worker given its worker node key.
1165 *
1166 * @param workerNodeKey - The worker node key.
1167 */
1168 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1169
1170 /**
1171 * Sends the statistics message to worker given its worker node key.
1172 *
1173 * @param workerNodeKey - The worker node key.
1174 */
1175 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1176 this.sendToWorker(workerNodeKey, {
1177 statistics: {
1178 runTime:
1179 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1180 .runTime.aggregate,
1181 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1182 .elu.aggregate
1183 },
1184 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1185 })
1186 }
1187
1188 private redistributeQueuedTasks (workerNodeKey: number): void {
1189 while (this.tasksQueueSize(workerNodeKey) > 0) {
1190 let destinationWorkerNodeKey!: number
1191 let minQueuedTasks = Infinity
1192 let executeTask = false
1193 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1194 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1195 if (
1196 workerNode.usage.tasks.executing <
1197 (this.opts.tasksQueueOptions?.concurrency as number)
1198 ) {
1199 executeTask = true
1200 }
1201 if (workerNode.usage.tasks.queued === 0) {
1202 destinationWorkerNodeKey = workerNodeId
1203 break
1204 }
1205 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1206 minQueuedTasks = workerNode.usage.tasks.queued
1207 destinationWorkerNodeKey = workerNodeId
1208 }
1209 }
1210 }
1211 if (destinationWorkerNodeKey != null) {
1212 const task = {
1213 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1214 workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
1215 .id as number
1216 }
1217 if (executeTask) {
1218 this.executeTask(destinationWorkerNodeKey, task)
1219 } else {
1220 this.enqueueTask(destinationWorkerNodeKey, task)
1221 }
1222 }
1223 }
1224 }
1225
1226 private taskStealingOnEmptyQueue (workerId: number): void {
1227 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1228 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1229 const workerNodes = this.workerNodes
1230 .slice()
1231 .sort(
1232 (workerNodeA, workerNodeB) =>
1233 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1234 )
1235 for (const sourceWorkerNode of workerNodes) {
1236 if (sourceWorkerNode.usage.tasks.queued === 0) {
1237 break
1238 }
1239 if (
1240 sourceWorkerNode.info.ready &&
1241 sourceWorkerNode.info.id !== workerId &&
1242 sourceWorkerNode.usage.tasks.queued > 0
1243 ) {
1244 const task = {
1245 ...(sourceWorkerNode.popTask() as Task<Data>),
1246 workerId: destinationWorkerNode.info.id as number
1247 }
1248 if (
1249 destinationWorkerNode.usage.tasks.executing <
1250 (this.opts.tasksQueueOptions?.concurrency as number)
1251 ) {
1252 this.executeTask(destinationWorkerNodeKey, task)
1253 } else {
1254 this.enqueueTask(destinationWorkerNodeKey, task)
1255 }
1256 break
1257 }
1258 }
1259 }
1260
1261 private tasksStealingOnBackPressure (workerId: number): void {
1262 const sourceWorkerNode =
1263 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1264 const workerNodes = this.workerNodes
1265 .slice()
1266 .sort(
1267 (workerNodeA, workerNodeB) =>
1268 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1269 )
1270 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1271 if (
1272 sourceWorkerNode.usage.tasks.queued > 0 &&
1273 workerNode.info.ready &&
1274 workerNode.info.id !== workerId &&
1275 workerNode.usage.tasks.queued <
1276 (this.opts.tasksQueueOptions?.size as number) - 1
1277 ) {
1278 const task = {
1279 ...(sourceWorkerNode.popTask() as Task<Data>),
1280 workerId: workerNode.info.id as number
1281 }
1282 if (
1283 workerNode.usage.tasks.executing <
1284 (this.opts.tasksQueueOptions?.concurrency as number)
1285 ) {
1286 this.executeTask(workerNodeKey, task)
1287 } else {
1288 this.enqueueTask(workerNodeKey, task)
1289 }
1290 }
1291 }
1292 }
1293
1294 /**
1295 * This method is the listener registered for each worker message.
1296 *
1297 * @returns The listener function to execute when a message is received from a worker.
1298 */
1299 protected workerListener (): (message: MessageValue<Response>) => void {
1300 return (message) => {
1301 this.checkMessageWorkerId(message)
1302 if (message.ready != null && message.taskFunctions != null) {
1303 // Worker ready response received from worker
1304 this.handleWorkerReadyResponse(message)
1305 } else if (message.taskId != null) {
1306 // Task execution response received from worker
1307 this.handleTaskExecutionResponse(message)
1308 } else if (message.taskFunctions != null) {
1309 // Task functions message received from worker
1310 (
1311 this.getWorkerInfo(
1312 this.getWorkerNodeKeyByWorkerId(message.workerId)
1313 ) as WorkerInfo
1314 ).taskFunctions = message.taskFunctions
1315 }
1316 }
1317 }
1318
1319 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1320 if (message.ready === false) {
1321 throw new Error(`Worker ${message.workerId} failed to initialize`)
1322 }
1323 const workerInfo = this.getWorkerInfo(
1324 this.getWorkerNodeKeyByWorkerId(message.workerId)
1325 ) as WorkerInfo
1326 workerInfo.ready = message.ready as boolean
1327 workerInfo.taskFunctions = message.taskFunctions
1328 if (this.emitter != null && this.ready) {
1329 this.emitter.emit(PoolEvents.ready, this.info)
1330 }
1331 }
1332
1333 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1334 const { taskId, taskError, data } = message
1335 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1336 if (promiseResponse != null) {
1337 if (taskError != null) {
1338 this.emitter?.emit(PoolEvents.taskError, taskError)
1339 promiseResponse.reject(taskError.message)
1340 } else {
1341 promiseResponse.resolve(data as Response)
1342 }
1343 const workerNodeKey = promiseResponse.workerNodeKey
1344 this.afterTaskExecutionHook(workerNodeKey, message)
1345 this.promiseResponseMap.delete(taskId as string)
1346 if (
1347 this.opts.enableTasksQueue === true &&
1348 this.tasksQueueSize(workerNodeKey) > 0 &&
1349 this.workerNodes[workerNodeKey].usage.tasks.executing <
1350 (this.opts.tasksQueueOptions?.concurrency as number)
1351 ) {
1352 this.executeTask(
1353 workerNodeKey,
1354 this.dequeueTask(workerNodeKey) as Task<Data>
1355 )
1356 }
1357 this.workerChoiceStrategyContext.update(workerNodeKey)
1358 }
1359 }
1360
1361 private checkAndEmitTaskExecutionEvents (): void {
1362 if (this.busy) {
1363 this.emitter?.emit(PoolEvents.busy, this.info)
1364 }
1365 }
1366
1367 private checkAndEmitTaskQueuingEvents (): void {
1368 if (this.hasBackPressure()) {
1369 this.emitter?.emit(PoolEvents.backPressure, this.info)
1370 }
1371 }
1372
1373 private checkAndEmitDynamicWorkerCreationEvents (): void {
1374 if (this.type === PoolTypes.dynamic) {
1375 if (this.full) {
1376 this.emitter?.emit(PoolEvents.full, this.info)
1377 }
1378 }
1379 }
1380
1381 /**
1382 * Gets the worker information given its worker node key.
1383 *
1384 * @param workerNodeKey - The worker node key.
1385 * @returns The worker information.
1386 */
1387 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1388 return this.workerNodes[workerNodeKey]?.info
1389 }
1390
1391 /**
1392 * Adds the given worker in the pool worker nodes.
1393 *
1394 * @param worker - The worker.
1395 * @returns The added worker node key.
1396 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1397 */
1398 private addWorkerNode (worker: Worker): number {
1399 const workerNode = new WorkerNode<Worker, Data>(
1400 worker,
1401 this.worker,
1402 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1403 )
1404 // Flag the worker node as ready at pool startup.
1405 if (this.starting) {
1406 workerNode.info.ready = true
1407 }
1408 this.workerNodes.push(workerNode)
1409 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1410 if (workerNodeKey === -1) {
1411 throw new Error('Worker node added not found')
1412 }
1413 return workerNodeKey
1414 }
1415
1416 /**
1417 * Removes the given worker from the pool worker nodes.
1418 *
1419 * @param worker - The worker.
1420 */
1421 private removeWorkerNode (worker: Worker): void {
1422 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1423 if (workerNodeKey !== -1) {
1424 this.workerNodes.splice(workerNodeKey, 1)
1425 this.workerChoiceStrategyContext.remove(workerNodeKey)
1426 }
1427 }
1428
1429 /** @inheritDoc */
1430 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1431 return (
1432 this.opts.enableTasksQueue === true &&
1433 this.workerNodes[workerNodeKey].hasBackPressure()
1434 )
1435 }
1436
1437 private hasBackPressure (): boolean {
1438 return (
1439 this.opts.enableTasksQueue === true &&
1440 this.workerNodes.findIndex(
1441 (workerNode) => !workerNode.hasBackPressure()
1442 ) === -1
1443 )
1444 }
1445
1446 /**
1447 * Executes the given task on the worker given its worker node key.
1448 *
1449 * @param workerNodeKey - The worker node key.
1450 * @param task - The task to execute.
1451 */
1452 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1453 this.beforeTaskExecutionHook(workerNodeKey, task)
1454 this.sendToWorker(workerNodeKey, task, task.transferList)
1455 this.checkAndEmitTaskExecutionEvents()
1456 }
1457
1458 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1459 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1460 this.checkAndEmitTaskQueuingEvents()
1461 return tasksQueueSize
1462 }
1463
1464 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1465 return this.workerNodes[workerNodeKey].dequeueTask()
1466 }
1467
1468 private tasksQueueSize (workerNodeKey: number): number {
1469 return this.workerNodes[workerNodeKey].tasksQueueSize()
1470 }
1471
1472 protected flushTasksQueue (workerNodeKey: number): void {
1473 while (this.tasksQueueSize(workerNodeKey) > 0) {
1474 this.executeTask(
1475 workerNodeKey,
1476 this.dequeueTask(workerNodeKey) as Task<Data>
1477 )
1478 }
1479 this.workerNodes[workerNodeKey].clearTasksQueue()
1480 }
1481
1482 private flushTasksQueues (): void {
1483 for (const [workerNodeKey] of this.workerNodes.entries()) {
1484 this.flushTasksQueue(workerNodeKey)
1485 }
1486 }
1487 }