docs: refine benchmark README.md
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import {
25 type IPool,
26 PoolEmitter,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51
52 /**
53 * Base class that implements some shared logic for all poolifier pools.
54 *
55 * @typeParam Worker - Type of worker which manages this pool.
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
58 */
59 export abstract class AbstractPool<
60 Worker extends IWorker,
61 Data = unknown,
62 Response = unknown
63 > implements IPool<Worker, Data, Response> {
64 /** @inheritDoc */
65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
66
67 /** @inheritDoc */
68 public readonly emitter?: PoolEmitter
69
70 /**
71 * The task execution response promise map.
72 *
73 * - `key`: The message id of each submitted task.
74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
75 *
76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
77 */
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
80
81 /**
82 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
85 Worker,
86 Data,
87 Response
88 >
89
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
95 /**
96 * Whether the pool is started or not.
97 */
98 private started: boolean
99 /**
100 * Whether the pool is starting or not.
101 */
102 private starting: boolean
103 /**
104 * The start timestamp of the pool.
105 */
106 private readonly startTimestamp
107
108 /**
109 * Constructs a new poolifier pool.
110 *
111 * @param numberOfWorkers - Number of workers that this pool should manage.
112 * @param filePath - Path to the worker file.
113 * @param opts - Options for the pool.
114 */
115 public constructor (
116 protected readonly numberOfWorkers: number,
117 protected readonly filePath: string,
118 protected readonly opts: PoolOptions<Worker>
119 ) {
120 if (!this.isMain()) {
121 throw new Error(
122 'Cannot start a pool from a worker with the same type as the pool'
123 )
124 }
125 this.checkNumberOfWorkers(this.numberOfWorkers)
126 this.checkFilePath(this.filePath)
127 this.checkPoolOptions(this.opts)
128
129 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
130 this.executeTask = this.executeTask.bind(this)
131 this.enqueueTask = this.enqueueTask.bind(this)
132
133 if (this.opts.enableEvents === true) {
134 this.emitter = new PoolEmitter()
135 }
136 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
137 Worker,
138 Data,
139 Response
140 >(
141 this,
142 this.opts.workerChoiceStrategy,
143 this.opts.workerChoiceStrategyOptions
144 )
145
146 this.setupHook()
147
148 this.started = false
149 this.starting = false
150 if (this.opts.startWorkers === true) {
151 this.start()
152 }
153
154 this.startTimestamp = performance.now()
155 }
156
157 private checkFilePath (filePath: string): void {
158 if (
159 filePath == null ||
160 typeof filePath !== 'string' ||
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
163 throw new Error('Please specify a file with a worker implementation')
164 }
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
189 if (this.type === PoolTypes.dynamic) {
190 if (max == null) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
202 } else if (max === 0) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
211 }
212 }
213
214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
215 if (isPlainObject(opts)) {
216 this.opts.startWorkers = opts.startWorkers ?? true
217 this.opts.workerChoiceStrategy =
218 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
219 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
220 this.opts.workerChoiceStrategyOptions = {
221 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
222 ...opts.workerChoiceStrategyOptions
223 }
224 this.checkValidWorkerChoiceStrategyOptions(
225 this.opts.workerChoiceStrategyOptions
226 )
227 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
228 this.opts.enableEvents = opts.enableEvents ?? true
229 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
230 if (this.opts.enableTasksQueue) {
231 this.checkValidTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
235 opts.tasksQueueOptions as TasksQueueOptions
236 )
237 }
238 } else {
239 throw new TypeError('Invalid pool options: must be a plain object')
240 }
241 }
242
243 private checkValidWorkerChoiceStrategy (
244 workerChoiceStrategy: WorkerChoiceStrategy
245 ): void {
246 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
247 throw new Error(
248 `Invalid worker choice strategy '${workerChoiceStrategy}'`
249 )
250 }
251 }
252
253 private checkValidWorkerChoiceStrategyOptions (
254 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
255 ): void {
256 if (!isPlainObject(workerChoiceStrategyOptions)) {
257 throw new TypeError(
258 'Invalid worker choice strategy options: must be a plain object'
259 )
260 }
261 if (
262 workerChoiceStrategyOptions.retries != null &&
263 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
264 ) {
265 throw new TypeError(
266 'Invalid worker choice strategy options: retries must be an integer'
267 )
268 }
269 if (
270 workerChoiceStrategyOptions.retries != null &&
271 workerChoiceStrategyOptions.retries < 0
272 ) {
273 throw new RangeError(
274 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
275 )
276 }
277 if (
278 workerChoiceStrategyOptions.weights != null &&
279 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
280 ) {
281 throw new Error(
282 'Invalid worker choice strategy options: must have a weight for each worker node'
283 )
284 }
285 if (
286 workerChoiceStrategyOptions.measurement != null &&
287 !Object.values(Measurements).includes(
288 workerChoiceStrategyOptions.measurement
289 )
290 ) {
291 throw new Error(
292 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
293 )
294 }
295 }
296
297 private checkValidTasksQueueOptions (
298 tasksQueueOptions: TasksQueueOptions
299 ): void {
300 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
301 throw new TypeError('Invalid tasks queue options: must be a plain object')
302 }
303 if (
304 tasksQueueOptions?.concurrency != null &&
305 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
306 ) {
307 throw new TypeError(
308 'Invalid worker node tasks concurrency: must be an integer'
309 )
310 }
311 if (
312 tasksQueueOptions?.concurrency != null &&
313 tasksQueueOptions?.concurrency <= 0
314 ) {
315 throw new RangeError(
316 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
317 )
318 }
319 if (
320 tasksQueueOptions?.size != null &&
321 !Number.isSafeInteger(tasksQueueOptions?.size)
322 ) {
323 throw new TypeError(
324 'Invalid worker node tasks queue size: must be an integer'
325 )
326 }
327 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
328 throw new RangeError(
329 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
330 )
331 }
332 }
333
334 /** @inheritDoc */
335 public get info (): PoolInfo {
336 return {
337 version,
338 type: this.type,
339 worker: this.worker,
340 started: this.started,
341 ready: this.ready,
342 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
343 minSize: this.minSize,
344 maxSize: this.maxSize,
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate &&
347 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
348 .waitTime.aggregate && { utilization: round(this.utilization) }),
349 workerNodes: this.workerNodes.length,
350 idleWorkerNodes: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 workerNode.usage.tasks.executing === 0
353 ? accumulator + 1
354 : accumulator,
355 0
356 ),
357 busyWorkerNodes: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
359 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
360 0
361 ),
362 executedTasks: this.workerNodes.reduce(
363 (accumulator, workerNode) =>
364 accumulator + workerNode.usage.tasks.executed,
365 0
366 ),
367 executingTasks: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 accumulator + workerNode.usage.tasks.executing,
370 0
371 ),
372 ...(this.opts.enableTasksQueue === true && {
373 queuedTasks: this.workerNodes.reduce(
374 (accumulator, workerNode) =>
375 accumulator + workerNode.usage.tasks.queued,
376 0
377 )
378 }),
379 ...(this.opts.enableTasksQueue === true && {
380 maxQueuedTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
383 0
384 )
385 }),
386 ...(this.opts.enableTasksQueue === true && {
387 backPressure: this.hasBackPressure()
388 }),
389 ...(this.opts.enableTasksQueue === true && {
390 stolenTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.stolen,
393 0
394 )
395 }),
396 failedTasks: this.workerNodes.reduce(
397 (accumulator, workerNode) =>
398 accumulator + workerNode.usage.tasks.failed,
399 0
400 ),
401 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
402 .runTime.aggregate && {
403 runTime: {
404 minimum: round(
405 min(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
408 )
409 )
410 ),
411 maximum: round(
412 max(
413 ...this.workerNodes.map(
414 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
415 )
416 )
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.average && {
420 average: round(
421 average(
422 this.workerNodes.reduce<number[]>(
423 (accumulator, workerNode) =>
424 accumulator.concat(workerNode.usage.runTime.history),
425 []
426 )
427 )
428 )
429 }),
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.median && {
432 median: round(
433 median(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
439 )
440 )
441 })
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
447 minimum: round(
448 min(
449 ...this.workerNodes.map(
450 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
451 )
452 )
453 ),
454 maximum: round(
455 max(
456 ...this.workerNodes.map(
457 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
458 )
459 )
460 ),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.average && {
463 average: round(
464 average(
465 this.workerNodes.reduce<number[]>(
466 (accumulator, workerNode) =>
467 accumulator.concat(workerNode.usage.waitTime.history),
468 []
469 )
470 )
471 )
472 }),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
482 )
483 )
484 })
485 }
486 })
487 }
488 }
489
490 /**
491 * The pool readiness boolean status.
492 */
493 private get ready (): boolean {
494 return (
495 this.workerNodes.reduce(
496 (accumulator, workerNode) =>
497 !workerNode.info.dynamic && workerNode.info.ready
498 ? accumulator + 1
499 : accumulator,
500 0
501 ) >= this.minSize
502 )
503 }
504
505 /**
506 * The approximate pool utilization.
507 *
508 * @returns The pool utilization.
509 */
510 private get utilization (): number {
511 const poolTimeCapacity =
512 (performance.now() - this.startTimestamp) * this.maxSize
513 const totalTasksRunTime = this.workerNodes.reduce(
514 (accumulator, workerNode) =>
515 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
516 0
517 )
518 const totalTasksWaitTime = this.workerNodes.reduce(
519 (accumulator, workerNode) =>
520 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
521 0
522 )
523 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
524 }
525
526 /**
527 * The pool type.
528 *
529 * If it is `'dynamic'`, it provides the `max` property.
530 */
531 protected abstract get type (): PoolType
532
533 /**
534 * The worker type.
535 */
536 protected abstract get worker (): WorkerType
537
538 /**
539 * The pool minimum size.
540 */
541 protected get minSize (): number {
542 return this.numberOfWorkers
543 }
544
545 /**
546 * The pool maximum size.
547 */
548 protected get maxSize (): number {
549 return this.max ?? this.numberOfWorkers
550 }
551
552 /**
553 * Checks if the worker id sent in the received message from a worker is valid.
554 *
555 * @param message - The received message.
556 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
557 */
558 private checkMessageWorkerId (message: MessageValue<Response>): void {
559 if (message.workerId == null) {
560 throw new Error('Worker message received without worker id')
561 } else if (
562 message.workerId != null &&
563 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
564 ) {
565 throw new Error(
566 `Worker message received from unknown worker '${message.workerId}'`
567 )
568 }
569 }
570
571 /**
572 * Gets the given worker its worker node key.
573 *
574 * @param worker - The worker.
575 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
576 */
577 private getWorkerNodeKeyByWorker (worker: Worker): number {
578 return this.workerNodes.findIndex(
579 workerNode => workerNode.worker === worker
580 )
581 }
582
583 /**
584 * Gets the worker node key given its worker id.
585 *
586 * @param workerId - The worker id.
587 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
588 */
589 private getWorkerNodeKeyByWorkerId (workerId: number): number {
590 return this.workerNodes.findIndex(
591 workerNode => workerNode.info.id === workerId
592 )
593 }
594
595 /** @inheritDoc */
596 public setWorkerChoiceStrategy (
597 workerChoiceStrategy: WorkerChoiceStrategy,
598 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
599 ): void {
600 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
601 this.opts.workerChoiceStrategy = workerChoiceStrategy
602 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
603 this.opts.workerChoiceStrategy
604 )
605 if (workerChoiceStrategyOptions != null) {
606 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
607 }
608 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
609 workerNode.resetUsage()
610 this.sendStatisticsMessageToWorker(workerNodeKey)
611 }
612 }
613
614 /** @inheritDoc */
615 public setWorkerChoiceStrategyOptions (
616 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
617 ): void {
618 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 this.opts.workerChoiceStrategyOptions = {
620 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
621 ...workerChoiceStrategyOptions
622 }
623 this.workerChoiceStrategyContext.setOptions(
624 this.opts.workerChoiceStrategyOptions
625 )
626 }
627
628 /** @inheritDoc */
629 public enableTasksQueue (
630 enable: boolean,
631 tasksQueueOptions?: TasksQueueOptions
632 ): void {
633 if (this.opts.enableTasksQueue === true && !enable) {
634 this.flushTasksQueues()
635 }
636 this.opts.enableTasksQueue = enable
637 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
638 }
639
640 /** @inheritDoc */
641 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
642 if (this.opts.enableTasksQueue === true) {
643 this.checkValidTasksQueueOptions(tasksQueueOptions)
644 this.opts.tasksQueueOptions =
645 this.buildTasksQueueOptions(tasksQueueOptions)
646 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
647 } else if (this.opts.tasksQueueOptions != null) {
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
652 private setTasksQueueSize (size: number): void {
653 for (const workerNode of this.workerNodes) {
654 workerNode.tasksQueueBackPressureSize = size
655 }
656 }
657
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
662 ...{
663 size: Math.pow(this.maxSize, 2),
664 concurrency: 1,
665 taskStealing: true,
666 tasksStealingOnBackPressure: true
667 },
668 ...tasksQueueOptions
669 }
670 }
671
672 /**
673 * Whether the pool is full or not.
674 *
675 * The pool filling boolean status.
676 */
677 protected get full (): boolean {
678 return this.workerNodes.length >= this.maxSize
679 }
680
681 /**
682 * Whether the pool is busy or not.
683 *
684 * The pool busyness boolean status.
685 */
686 protected abstract get busy (): boolean
687
688 /**
689 * Whether worker nodes are executing concurrently their tasks quota or not.
690 *
691 * @returns Worker nodes busyness boolean status.
692 */
693 protected internalBusy (): boolean {
694 if (this.opts.enableTasksQueue === true) {
695 return (
696 this.workerNodes.findIndex(
697 workerNode =>
698 workerNode.info.ready &&
699 workerNode.usage.tasks.executing <
700 (this.opts.tasksQueueOptions?.concurrency as number)
701 ) === -1
702 )
703 } else {
704 return (
705 this.workerNodes.findIndex(
706 workerNode =>
707 workerNode.info.ready && workerNode.usage.tasks.executing === 0
708 ) === -1
709 )
710 }
711 }
712
713 /** @inheritDoc */
714 public listTaskFunctions (): string[] {
715 for (const workerNode of this.workerNodes) {
716 if (
717 Array.isArray(workerNode.info.taskFunctions) &&
718 workerNode.info.taskFunctions.length > 0
719 ) {
720 return workerNode.info.taskFunctions
721 }
722 }
723 return []
724 }
725
726 private shallExecuteTask (workerNodeKey: number): boolean {
727 return (
728 this.tasksQueueSize(workerNodeKey) === 0 &&
729 this.workerNodes[workerNodeKey].usage.tasks.executing <
730 (this.opts.tasksQueueOptions?.concurrency as number)
731 )
732 }
733
734 /** @inheritDoc */
735 public async execute (
736 data?: Data,
737 name?: string,
738 transferList?: TransferListItem[]
739 ): Promise<Response> {
740 return await new Promise<Response>((resolve, reject) => {
741 if (!this.started) {
742 reject(new Error('Cannot execute a task on not started pool'))
743 return
744 }
745 if (name != null && typeof name !== 'string') {
746 reject(new TypeError('name argument must be a string'))
747 return
748 }
749 if (
750 name != null &&
751 typeof name === 'string' &&
752 name.trim().length === 0
753 ) {
754 reject(new TypeError('name argument must not be an empty string'))
755 return
756 }
757 if (transferList != null && !Array.isArray(transferList)) {
758 reject(new TypeError('transferList argument must be an array'))
759 return
760 }
761 const timestamp = performance.now()
762 const workerNodeKey = this.chooseWorkerNode()
763 const task: Task<Data> = {
764 name: name ?? DEFAULT_TASK_NAME,
765 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
766 data: data ?? ({} as Data),
767 transferList,
768 timestamp,
769 workerId: this.getWorkerInfo(workerNodeKey).id as number,
770 taskId: randomUUID()
771 }
772 this.promiseResponseMap.set(task.taskId as string, {
773 resolve,
774 reject,
775 workerNodeKey
776 })
777 if (
778 this.opts.enableTasksQueue === false ||
779 (this.opts.enableTasksQueue === true &&
780 this.shallExecuteTask(workerNodeKey))
781 ) {
782 this.executeTask(workerNodeKey, task)
783 } else {
784 this.enqueueTask(workerNodeKey, task)
785 }
786 })
787 }
788
789 /** @inheritdoc */
790 public start (): void {
791 this.starting = true
792 while (
793 this.workerNodes.reduce(
794 (accumulator, workerNode) =>
795 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
796 0
797 ) < this.numberOfWorkers
798 ) {
799 this.createAndSetupWorkerNode()
800 }
801 this.starting = false
802 this.started = true
803 }
804
805 /** @inheritDoc */
806 public async destroy (): Promise<void> {
807 await Promise.all(
808 this.workerNodes.map(async (_, workerNodeKey) => {
809 await this.destroyWorkerNode(workerNodeKey)
810 })
811 )
812 this.emitter?.emit(PoolEvents.destroy, this.info)
813 this.started = false
814 }
815
816 protected async sendKillMessageToWorker (
817 workerNodeKey: number,
818 workerId: number
819 ): Promise<void> {
820 await new Promise<void>((resolve, reject) => {
821 this.registerWorkerMessageListener(workerNodeKey, message => {
822 if (message.kill === 'success') {
823 resolve()
824 } else if (message.kill === 'failure') {
825 reject(new Error(`Worker ${workerId} kill message handling failed`))
826 }
827 })
828 this.sendToWorker(workerNodeKey, { kill: true, workerId })
829 })
830 }
831
832 /**
833 * Terminates the worker node given its worker node key.
834 *
835 * @param workerNodeKey - The worker node key.
836 */
837 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
838
839 /**
840 * Setup hook to execute code before worker nodes are created in the abstract constructor.
841 * Can be overridden.
842 *
843 * @virtual
844 */
845 protected setupHook (): void {
846 /* Intentionally empty */
847 }
848
849 /**
850 * Should return whether the worker is the main worker or not.
851 */
852 protected abstract isMain (): boolean
853
854 /**
855 * Hook executed before the worker task execution.
856 * Can be overridden.
857 *
858 * @param workerNodeKey - The worker node key.
859 * @param task - The task to execute.
860 */
861 protected beforeTaskExecutionHook (
862 workerNodeKey: number,
863 task: Task<Data>
864 ): void {
865 if (this.workerNodes[workerNodeKey]?.usage != null) {
866 const workerUsage = this.workerNodes[workerNodeKey].usage
867 ++workerUsage.tasks.executing
868 this.updateWaitTimeWorkerUsage(workerUsage, task)
869 }
870 if (
871 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
872 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
873 task.name as string
874 ) != null
875 ) {
876 const taskFunctionWorkerUsage = this.workerNodes[
877 workerNodeKey
878 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
879 ++taskFunctionWorkerUsage.tasks.executing
880 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
881 }
882 }
883
884 /**
885 * Hook executed after the worker task execution.
886 * Can be overridden.
887 *
888 * @param workerNodeKey - The worker node key.
889 * @param message - The received message.
890 */
891 protected afterTaskExecutionHook (
892 workerNodeKey: number,
893 message: MessageValue<Response>
894 ): void {
895 if (this.workerNodes[workerNodeKey]?.usage != null) {
896 const workerUsage = this.workerNodes[workerNodeKey].usage
897 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
898 this.updateRunTimeWorkerUsage(workerUsage, message)
899 this.updateEluWorkerUsage(workerUsage, message)
900 }
901 if (
902 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
903 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
904 message.taskPerformance?.name as string
905 ) != null
906 ) {
907 const taskFunctionWorkerUsage = this.workerNodes[
908 workerNodeKey
909 ].getTaskFunctionWorkerUsage(
910 message.taskPerformance?.name as string
911 ) as WorkerUsage
912 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
913 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
914 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
915 }
916 }
917
918 /**
919 * Whether the worker node shall update its task function worker usage or not.
920 *
921 * @param workerNodeKey - The worker node key.
922 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
923 */
924 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
925 const workerInfo = this.getWorkerInfo(workerNodeKey)
926 return (
927 workerInfo != null &&
928 Array.isArray(workerInfo.taskFunctions) &&
929 workerInfo.taskFunctions.length > 2
930 )
931 }
932
933 private updateTaskStatisticsWorkerUsage (
934 workerUsage: WorkerUsage,
935 message: MessageValue<Response>
936 ): void {
937 const workerTaskStatistics = workerUsage.tasks
938 if (
939 workerTaskStatistics.executing != null &&
940 workerTaskStatistics.executing > 0
941 ) {
942 --workerTaskStatistics.executing
943 }
944 if (message.taskError == null) {
945 ++workerTaskStatistics.executed
946 } else {
947 ++workerTaskStatistics.failed
948 }
949 }
950
951 private updateRunTimeWorkerUsage (
952 workerUsage: WorkerUsage,
953 message: MessageValue<Response>
954 ): void {
955 if (message.taskError != null) {
956 return
957 }
958 updateMeasurementStatistics(
959 workerUsage.runTime,
960 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
961 message.taskPerformance?.runTime ?? 0
962 )
963 }
964
965 private updateWaitTimeWorkerUsage (
966 workerUsage: WorkerUsage,
967 task: Task<Data>
968 ): void {
969 const timestamp = performance.now()
970 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
971 updateMeasurementStatistics(
972 workerUsage.waitTime,
973 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
974 taskWaitTime
975 )
976 }
977
978 private updateEluWorkerUsage (
979 workerUsage: WorkerUsage,
980 message: MessageValue<Response>
981 ): void {
982 if (message.taskError != null) {
983 return
984 }
985 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
986 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
987 updateMeasurementStatistics(
988 workerUsage.elu.active,
989 eluTaskStatisticsRequirements,
990 message.taskPerformance?.elu?.active ?? 0
991 )
992 updateMeasurementStatistics(
993 workerUsage.elu.idle,
994 eluTaskStatisticsRequirements,
995 message.taskPerformance?.elu?.idle ?? 0
996 )
997 if (eluTaskStatisticsRequirements.aggregate) {
998 if (message.taskPerformance?.elu != null) {
999 if (workerUsage.elu.utilization != null) {
1000 workerUsage.elu.utilization =
1001 (workerUsage.elu.utilization +
1002 message.taskPerformance.elu.utilization) /
1003 2
1004 } else {
1005 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1006 }
1007 }
1008 }
1009 }
1010
1011 /**
1012 * Chooses a worker node for the next task.
1013 *
1014 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1015 *
1016 * @returns The chosen worker node key
1017 */
1018 private chooseWorkerNode (): number {
1019 if (this.shallCreateDynamicWorker()) {
1020 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1021 if (
1022 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1023 ) {
1024 return workerNodeKey
1025 }
1026 }
1027 return this.workerChoiceStrategyContext.execute()
1028 }
1029
1030 /**
1031 * Conditions for dynamic worker creation.
1032 *
1033 * @returns Whether to create a dynamic worker or not.
1034 */
1035 private shallCreateDynamicWorker (): boolean {
1036 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1037 }
1038
1039 /**
1040 * Sends a message to worker given its worker node key.
1041 *
1042 * @param workerNodeKey - The worker node key.
1043 * @param message - The message.
1044 * @param transferList - The optional array of transferable objects.
1045 */
1046 protected abstract sendToWorker (
1047 workerNodeKey: number,
1048 message: MessageValue<Data>,
1049 transferList?: TransferListItem[]
1050 ): void
1051
1052 /**
1053 * Creates a new worker.
1054 *
1055 * @returns Newly created worker.
1056 */
1057 protected abstract createWorker (): Worker
1058
1059 /**
1060 * Creates a new, completely set up worker node.
1061 *
1062 * @returns New, completely set up worker node key.
1063 */
1064 protected createAndSetupWorkerNode (): number {
1065 const worker = this.createWorker()
1066
1067 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1068 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1069 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1070 worker.on('error', error => {
1071 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1072 const workerInfo = this.getWorkerInfo(workerNodeKey)
1073 workerInfo.ready = false
1074 this.workerNodes[workerNodeKey].closeChannel()
1075 this.emitter?.emit(PoolEvents.error, error)
1076 if (
1077 this.opts.restartWorkerOnError === true &&
1078 this.started &&
1079 !this.starting
1080 ) {
1081 if (workerInfo.dynamic) {
1082 this.createAndSetupDynamicWorkerNode()
1083 } else {
1084 this.createAndSetupWorkerNode()
1085 }
1086 }
1087 if (this.opts.enableTasksQueue === true) {
1088 this.redistributeQueuedTasks(workerNodeKey)
1089 }
1090 })
1091 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1092 worker.once('exit', () => {
1093 this.removeWorkerNode(worker)
1094 })
1095
1096 const workerNodeKey = this.addWorkerNode(worker)
1097
1098 this.afterWorkerNodeSetup(workerNodeKey)
1099
1100 return workerNodeKey
1101 }
1102
1103 /**
1104 * Creates a new, completely set up dynamic worker node.
1105 *
1106 * @returns New, completely set up dynamic worker node key.
1107 */
1108 protected createAndSetupDynamicWorkerNode (): number {
1109 const workerNodeKey = this.createAndSetupWorkerNode()
1110 this.registerWorkerMessageListener(workerNodeKey, message => {
1111 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1112 message.workerId
1113 )
1114 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1115 // Kill message received from worker
1116 if (
1117 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1118 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1119 ((this.opts.enableTasksQueue === false &&
1120 workerUsage.tasks.executing === 0) ||
1121 (this.opts.enableTasksQueue === true &&
1122 workerUsage.tasks.executing === 0 &&
1123 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1124 ) {
1125 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1126 this.emitter?.emit(PoolEvents.error, error)
1127 })
1128 }
1129 })
1130 const workerInfo = this.getWorkerInfo(workerNodeKey)
1131 this.sendToWorker(workerNodeKey, {
1132 checkActive: true,
1133 workerId: workerInfo.id as number
1134 })
1135 workerInfo.dynamic = true
1136 if (
1137 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1138 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1139 ) {
1140 workerInfo.ready = true
1141 }
1142 this.checkAndEmitDynamicWorkerCreationEvents()
1143 return workerNodeKey
1144 }
1145
1146 /**
1147 * Registers a listener callback on the worker given its worker node key.
1148 *
1149 * @param workerNodeKey - The worker node key.
1150 * @param listener - The message listener callback.
1151 */
1152 protected abstract registerWorkerMessageListener<
1153 Message extends Data | Response
1154 >(
1155 workerNodeKey: number,
1156 listener: (message: MessageValue<Message>) => void
1157 ): void
1158
1159 /**
1160 * Method hooked up after a worker node has been newly created.
1161 * Can be overridden.
1162 *
1163 * @param workerNodeKey - The newly created worker node key.
1164 */
1165 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1166 // Listen to worker messages.
1167 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1168 // Send the startup message to worker.
1169 this.sendStartupMessageToWorker(workerNodeKey)
1170 // Send the statistics message to worker.
1171 this.sendStatisticsMessageToWorker(workerNodeKey)
1172 if (this.opts.enableTasksQueue === true) {
1173 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1174 this.workerNodes[workerNodeKey].onEmptyQueue =
1175 this.taskStealingOnEmptyQueue.bind(this)
1176 }
1177 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1178 this.workerNodes[workerNodeKey].onBackPressure =
1179 this.tasksStealingOnBackPressure.bind(this)
1180 }
1181 }
1182 }
1183
1184 /**
1185 * Sends the startup message to worker given its worker node key.
1186 *
1187 * @param workerNodeKey - The worker node key.
1188 */
1189 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1190
1191 /**
1192 * Sends the statistics message to worker given its worker node key.
1193 *
1194 * @param workerNodeKey - The worker node key.
1195 */
1196 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1197 this.sendToWorker(workerNodeKey, {
1198 statistics: {
1199 runTime:
1200 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1201 .runTime.aggregate,
1202 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1203 .elu.aggregate
1204 },
1205 workerId: this.getWorkerInfo(workerNodeKey).id as number
1206 })
1207 }
1208
1209 private redistributeQueuedTasks (workerNodeKey: number): void {
1210 while (this.tasksQueueSize(workerNodeKey) > 0) {
1211 const destinationWorkerNodeKey = this.workerNodes.reduce(
1212 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1213 return workerNode.info.ready &&
1214 workerNode.usage.tasks.queued <
1215 workerNodes[minWorkerNodeKey].usage.tasks.queued
1216 ? workerNodeKey
1217 : minWorkerNodeKey
1218 },
1219 0
1220 )
1221 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1222 const task = {
1223 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1224 workerId: destinationWorkerNode.info.id as number
1225 }
1226 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1227 this.executeTask(destinationWorkerNodeKey, task)
1228 } else {
1229 this.enqueueTask(destinationWorkerNodeKey, task)
1230 }
1231 }
1232 }
1233
1234 private updateTaskStolenStatisticsWorkerUsage (
1235 workerNodeKey: number,
1236 taskName: string
1237 ): void {
1238 const workerNode = this.workerNodes[workerNodeKey]
1239 if (workerNode?.usage != null) {
1240 ++workerNode.usage.tasks.stolen
1241 }
1242 if (
1243 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1244 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1245 ) {
1246 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1247 taskName
1248 ) as WorkerUsage
1249 ++taskFunctionWorkerUsage.tasks.stolen
1250 }
1251 }
1252
1253 private taskStealingOnEmptyQueue (workerId: number): void {
1254 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1255 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1256 const workerNodes = this.workerNodes
1257 .slice()
1258 .sort(
1259 (workerNodeA, workerNodeB) =>
1260 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1261 )
1262 const sourceWorkerNode = workerNodes.find(
1263 workerNode =>
1264 workerNode.info.ready &&
1265 workerNode.info.id !== workerId &&
1266 workerNode.usage.tasks.queued > 0
1267 )
1268 if (sourceWorkerNode != null) {
1269 const task = {
1270 ...(sourceWorkerNode.popTask() as Task<Data>),
1271 workerId: destinationWorkerNode.info.id as number
1272 }
1273 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1274 this.executeTask(destinationWorkerNodeKey, task)
1275 } else {
1276 this.enqueueTask(destinationWorkerNodeKey, task)
1277 }
1278 this.updateTaskStolenStatisticsWorkerUsage(
1279 destinationWorkerNodeKey,
1280 task.name as string
1281 )
1282 }
1283 }
1284
1285 private tasksStealingOnBackPressure (workerId: number): void {
1286 const sizeOffset = 1
1287 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1288 return
1289 }
1290 const sourceWorkerNode =
1291 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1292 const workerNodes = this.workerNodes
1293 .slice()
1294 .sort(
1295 (workerNodeA, workerNodeB) =>
1296 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1297 )
1298 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1299 if (
1300 sourceWorkerNode.usage.tasks.queued > 0 &&
1301 workerNode.info.ready &&
1302 workerNode.info.id !== workerId &&
1303 workerNode.usage.tasks.queued <
1304 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1305 ) {
1306 const task = {
1307 ...(sourceWorkerNode.popTask() as Task<Data>),
1308 workerId: workerNode.info.id as number
1309 }
1310 if (this.shallExecuteTask(workerNodeKey)) {
1311 this.executeTask(workerNodeKey, task)
1312 } else {
1313 this.enqueueTask(workerNodeKey, task)
1314 }
1315 this.updateTaskStolenStatisticsWorkerUsage(
1316 workerNodeKey,
1317 task.name as string
1318 )
1319 }
1320 }
1321 }
1322
1323 /**
1324 * This method is the listener registered for each worker message.
1325 *
1326 * @returns The listener function to execute when a message is received from a worker.
1327 */
1328 protected workerListener (): (message: MessageValue<Response>) => void {
1329 return message => {
1330 this.checkMessageWorkerId(message)
1331 if (message.ready != null && message.taskFunctions != null) {
1332 // Worker ready response received from worker
1333 this.handleWorkerReadyResponse(message)
1334 } else if (message.taskId != null) {
1335 // Task execution response received from worker
1336 this.handleTaskExecutionResponse(message)
1337 } else if (message.taskFunctions != null) {
1338 // Task functions message received from worker
1339 this.getWorkerInfo(
1340 this.getWorkerNodeKeyByWorkerId(message.workerId)
1341 ).taskFunctions = message.taskFunctions
1342 }
1343 }
1344 }
1345
1346 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1347 if (message.ready === false) {
1348 throw new Error(`Worker ${message.workerId} failed to initialize`)
1349 }
1350 const workerInfo = this.getWorkerInfo(
1351 this.getWorkerNodeKeyByWorkerId(message.workerId)
1352 )
1353 workerInfo.ready = message.ready as boolean
1354 workerInfo.taskFunctions = message.taskFunctions
1355 if (this.emitter != null && this.ready) {
1356 this.emitter.emit(PoolEvents.ready, this.info)
1357 }
1358 }
1359
1360 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1361 const { taskId, taskError, data } = message
1362 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1363 if (promiseResponse != null) {
1364 if (taskError != null) {
1365 this.emitter?.emit(PoolEvents.taskError, taskError)
1366 promiseResponse.reject(taskError.message)
1367 } else {
1368 promiseResponse.resolve(data as Response)
1369 }
1370 const workerNodeKey = promiseResponse.workerNodeKey
1371 this.afterTaskExecutionHook(workerNodeKey, message)
1372 this.workerChoiceStrategyContext.update(workerNodeKey)
1373 this.promiseResponseMap.delete(taskId as string)
1374 if (
1375 this.opts.enableTasksQueue === true &&
1376 this.tasksQueueSize(workerNodeKey) > 0 &&
1377 this.workerNodes[workerNodeKey].usage.tasks.executing <
1378 (this.opts.tasksQueueOptions?.concurrency as number)
1379 ) {
1380 this.executeTask(
1381 workerNodeKey,
1382 this.dequeueTask(workerNodeKey) as Task<Data>
1383 )
1384 }
1385 }
1386 }
1387
1388 private checkAndEmitTaskExecutionEvents (): void {
1389 if (this.busy) {
1390 this.emitter?.emit(PoolEvents.busy, this.info)
1391 }
1392 }
1393
1394 private checkAndEmitTaskQueuingEvents (): void {
1395 if (this.hasBackPressure()) {
1396 this.emitter?.emit(PoolEvents.backPressure, this.info)
1397 }
1398 }
1399
1400 private checkAndEmitDynamicWorkerCreationEvents (): void {
1401 if (this.type === PoolTypes.dynamic) {
1402 if (this.full) {
1403 this.emitter?.emit(PoolEvents.full, this.info)
1404 }
1405 }
1406 }
1407
1408 /**
1409 * Gets the worker information given its worker node key.
1410 *
1411 * @param workerNodeKey - The worker node key.
1412 * @returns The worker information.
1413 */
1414 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1415 return this.workerNodes[workerNodeKey].info
1416 }
1417
1418 /**
1419 * Adds the given worker in the pool worker nodes.
1420 *
1421 * @param worker - The worker.
1422 * @returns The added worker node key.
1423 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1424 */
1425 private addWorkerNode (worker: Worker): number {
1426 const workerNode = new WorkerNode<Worker, Data>(
1427 worker,
1428 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1429 )
1430 // Flag the worker node as ready at pool startup.
1431 if (this.starting) {
1432 workerNode.info.ready = true
1433 }
1434 this.workerNodes.push(workerNode)
1435 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1436 if (workerNodeKey === -1) {
1437 throw new Error('Worker added not found in worker nodes')
1438 }
1439 return workerNodeKey
1440 }
1441
1442 /**
1443 * Removes the given worker from the pool worker nodes.
1444 *
1445 * @param worker - The worker.
1446 */
1447 private removeWorkerNode (worker: Worker): void {
1448 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1449 if (workerNodeKey !== -1) {
1450 this.workerNodes.splice(workerNodeKey, 1)
1451 this.workerChoiceStrategyContext.remove(workerNodeKey)
1452 }
1453 }
1454
1455 /** @inheritDoc */
1456 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1457 return (
1458 this.opts.enableTasksQueue === true &&
1459 this.workerNodes[workerNodeKey].hasBackPressure()
1460 )
1461 }
1462
1463 private hasBackPressure (): boolean {
1464 return (
1465 this.opts.enableTasksQueue === true &&
1466 this.workerNodes.findIndex(
1467 workerNode => !workerNode.hasBackPressure()
1468 ) === -1
1469 )
1470 }
1471
1472 /**
1473 * Executes the given task on the worker given its worker node key.
1474 *
1475 * @param workerNodeKey - The worker node key.
1476 * @param task - The task to execute.
1477 */
1478 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1479 this.beforeTaskExecutionHook(workerNodeKey, task)
1480 this.sendToWorker(workerNodeKey, task, task.transferList)
1481 this.checkAndEmitTaskExecutionEvents()
1482 }
1483
1484 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1485 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1486 this.checkAndEmitTaskQueuingEvents()
1487 return tasksQueueSize
1488 }
1489
1490 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1491 return this.workerNodes[workerNodeKey].dequeueTask()
1492 }
1493
1494 private tasksQueueSize (workerNodeKey: number): number {
1495 return this.workerNodes[workerNodeKey].tasksQueueSize()
1496 }
1497
1498 protected flushTasksQueue (workerNodeKey: number): void {
1499 while (this.tasksQueueSize(workerNodeKey) > 0) {
1500 this.executeTask(
1501 workerNodeKey,
1502 this.dequeueTask(workerNodeKey) as Task<Data>
1503 )
1504 }
1505 this.workerNodes[workerNodeKey].clearTasksQueue()
1506 }
1507
1508 private flushTasksQueues (): void {
1509 for (const [workerNodeKey] of this.workerNodes.entries()) {
1510 this.flushTasksQueue(workerNodeKey)
1511 }
1512 }
1513 }