docs: refine comments
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import {
25 type IPool,
26 PoolEmitter,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51
52 /**
53 * Base class that implements some shared logic for all poolifier pools.
54 *
55 * @typeParam Worker - Type of worker which manages this pool.
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
58 */
59 export abstract class AbstractPool<
60 Worker extends IWorker,
61 Data = unknown,
62 Response = unknown
63 > implements IPool<Worker, Data, Response> {
64 /** @inheritDoc */
65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
66
67 /** @inheritDoc */
68 public readonly emitter?: PoolEmitter
69
70 /**
71 * The task execution response promise map:
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is started or not.
96 */
97 private started: boolean
98 /**
99 * Whether the pool is starting or not.
100 */
101 private starting: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.started = false
148 this.starting = false
149 if (this.opts.startWorkers === true) {
150 this.start()
151 }
152
153 this.startTimestamp = performance.now()
154 }
155
156 private checkFilePath (filePath: string): void {
157 if (
158 filePath == null ||
159 typeof filePath !== 'string' ||
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
162 throw new Error('Please specify a file with a worker implementation')
163 }
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
167 }
168
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
175 throw new TypeError(
176 'Cannot instantiate a pool with a non safe integer number of workers'
177 )
178 } else if (numberOfWorkers < 0) {
179 throw new RangeError(
180 'Cannot instantiate a pool with a negative number of workers'
181 )
182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
188 if (this.type === PoolTypes.dynamic) {
189 if (max == null) {
190 throw new TypeError(
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
201 } else if (max === 0) {
202 throw new RangeError(
203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
210 }
211 }
212
213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
214 if (isPlainObject(opts)) {
215 this.opts.startWorkers = opts.startWorkers ?? true
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
239 }
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
246 throw new Error(
247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
248 )
249 }
250 }
251
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: retries must be an integer'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
271 ) {
272 throw new RangeError(
273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.weights != null &&
278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
294 }
295
296 private checkValidTasksQueueOptions (
297 tasksQueueOptions: TasksQueueOptions
298 ): void {
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
302 if (
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
305 ) {
306 throw new TypeError(
307 'Invalid worker node tasks concurrency: must be an integer'
308 )
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
313 ) {
314 throw new RangeError(
315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
316 )
317 }
318 if (
319 tasksQueueOptions?.size != null &&
320 !Number.isSafeInteger(tasksQueueOptions?.size)
321 ) {
322 throw new TypeError(
323 'Invalid worker node tasks queue size: must be an integer'
324 )
325 }
326 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
327 throw new RangeError(
328 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
329 )
330 }
331 }
332
333 /** @inheritDoc */
334 public get info (): PoolInfo {
335 return {
336 version,
337 type: this.type,
338 worker: this.worker,
339 started: this.started,
340 ready: this.ready,
341 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
342 minSize: this.minSize,
343 maxSize: this.maxSize,
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate &&
346 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .waitTime.aggregate && { utilization: round(this.utilization) }),
348 workerNodes: this.workerNodes.length,
349 idleWorkerNodes: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 workerNode.usage.tasks.executing === 0
352 ? accumulator + 1
353 : accumulator,
354 0
355 ),
356 busyWorkerNodes: this.workerNodes.reduce(
357 (accumulator, workerNode) =>
358 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
359 0
360 ),
361 executedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.executed,
364 0
365 ),
366 executingTasks: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + workerNode.usage.tasks.executing,
369 0
370 ),
371 ...(this.opts.enableTasksQueue === true && {
372 queuedTasks: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + workerNode.usage.tasks.queued,
375 0
376 )
377 }),
378 ...(this.opts.enableTasksQueue === true && {
379 maxQueuedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
382 0
383 )
384 }),
385 ...(this.opts.enableTasksQueue === true && {
386 backPressure: this.hasBackPressure()
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 stolenTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + workerNode.usage.tasks.stolen,
392 0
393 )
394 }),
395 failedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + workerNode.usage.tasks.failed,
398 0
399 ),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .runTime.aggregate && {
402 runTime: {
403 minimum: round(
404 min(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
407 )
408 )
409 ),
410 maximum: round(
411 max(
412 ...this.workerNodes.map(
413 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
414 )
415 )
416 ),
417 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 .runTime.average && {
419 average: round(
420 average(
421 this.workerNodes.reduce<number[]>(
422 (accumulator, workerNode) =>
423 accumulator.concat(workerNode.usage.runTime.history),
424 []
425 )
426 )
427 )
428 }),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .runTime.median && {
431 median: round(
432 median(
433 this.workerNodes.reduce<number[]>(
434 (accumulator, workerNode) =>
435 accumulator.concat(workerNode.usage.runTime.history),
436 []
437 )
438 )
439 )
440 })
441 }
442 }),
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .waitTime.aggregate && {
445 waitTime: {
446 minimum: round(
447 min(
448 ...this.workerNodes.map(
449 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
450 )
451 )
452 ),
453 maximum: round(
454 max(
455 ...this.workerNodes.map(
456 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
457 )
458 )
459 ),
460 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 .waitTime.average && {
462 average: round(
463 average(
464 this.workerNodes.reduce<number[]>(
465 (accumulator, workerNode) =>
466 accumulator.concat(workerNode.usage.waitTime.history),
467 []
468 )
469 )
470 )
471 }),
472 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
473 .waitTime.median && {
474 median: round(
475 median(
476 this.workerNodes.reduce<number[]>(
477 (accumulator, workerNode) =>
478 accumulator.concat(workerNode.usage.waitTime.history),
479 []
480 )
481 )
482 )
483 })
484 }
485 })
486 }
487 }
488
489 /**
490 * The pool readiness boolean status.
491 */
492 private get ready (): boolean {
493 return (
494 this.workerNodes.reduce(
495 (accumulator, workerNode) =>
496 !workerNode.info.dynamic && workerNode.info.ready
497 ? accumulator + 1
498 : accumulator,
499 0
500 ) >= this.minSize
501 )
502 }
503
504 /**
505 * The approximate pool utilization.
506 *
507 * @returns The pool utilization.
508 */
509 private get utilization (): number {
510 const poolTimeCapacity =
511 (performance.now() - this.startTimestamp) * this.maxSize
512 const totalTasksRunTime = this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
515 0
516 )
517 const totalTasksWaitTime = this.workerNodes.reduce(
518 (accumulator, workerNode) =>
519 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
520 0
521 )
522 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
523 }
524
525 /**
526 * The pool type.
527 *
528 * If it is `'dynamic'`, it provides the `max` property.
529 */
530 protected abstract get type (): PoolType
531
532 /**
533 * The worker type.
534 */
535 protected abstract get worker (): WorkerType
536
537 /**
538 * The pool minimum size.
539 */
540 protected get minSize (): number {
541 return this.numberOfWorkers
542 }
543
544 /**
545 * The pool maximum size.
546 */
547 protected get maxSize (): number {
548 return this.max ?? this.numberOfWorkers
549 }
550
551 /**
552 * Checks if the worker id sent in the received message from a worker is valid.
553 *
554 * @param message - The received message.
555 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
556 */
557 private checkMessageWorkerId (message: MessageValue<Response>): void {
558 if (message.workerId == null) {
559 throw new Error('Worker message received without worker id')
560 } else if (
561 message.workerId != null &&
562 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
563 ) {
564 throw new Error(
565 `Worker message received from unknown worker '${message.workerId}'`
566 )
567 }
568 }
569
570 /**
571 * Gets the given worker its worker node key.
572 *
573 * @param worker - The worker.
574 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
575 */
576 private getWorkerNodeKeyByWorker (worker: Worker): number {
577 return this.workerNodes.findIndex(
578 workerNode => workerNode.worker === worker
579 )
580 }
581
582 /**
583 * Gets the worker node key given its worker id.
584 *
585 * @param workerId - The worker id.
586 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
587 */
588 private getWorkerNodeKeyByWorkerId (workerId: number): number {
589 return this.workerNodes.findIndex(
590 workerNode => workerNode.info.id === workerId
591 )
592 }
593
594 /** @inheritDoc */
595 public setWorkerChoiceStrategy (
596 workerChoiceStrategy: WorkerChoiceStrategy,
597 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
598 ): void {
599 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
600 this.opts.workerChoiceStrategy = workerChoiceStrategy
601 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
602 this.opts.workerChoiceStrategy
603 )
604 if (workerChoiceStrategyOptions != null) {
605 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
606 }
607 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
608 workerNode.resetUsage()
609 this.sendStatisticsMessageToWorker(workerNodeKey)
610 }
611 }
612
613 /** @inheritDoc */
614 public setWorkerChoiceStrategyOptions (
615 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
616 ): void {
617 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
618 this.opts.workerChoiceStrategyOptions = {
619 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
620 ...workerChoiceStrategyOptions
621 }
622 this.workerChoiceStrategyContext.setOptions(
623 this.opts.workerChoiceStrategyOptions
624 )
625 }
626
627 /** @inheritDoc */
628 public enableTasksQueue (
629 enable: boolean,
630 tasksQueueOptions?: TasksQueueOptions
631 ): void {
632 if (this.opts.enableTasksQueue === true && !enable) {
633 this.flushTasksQueues()
634 }
635 this.opts.enableTasksQueue = enable
636 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
637 }
638
639 /** @inheritDoc */
640 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
641 if (this.opts.enableTasksQueue === true) {
642 this.checkValidTasksQueueOptions(tasksQueueOptions)
643 this.opts.tasksQueueOptions =
644 this.buildTasksQueueOptions(tasksQueueOptions)
645 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
646 } else if (this.opts.tasksQueueOptions != null) {
647 delete this.opts.tasksQueueOptions
648 }
649 }
650
651 private setTasksQueueSize (size: number): void {
652 for (const workerNode of this.workerNodes) {
653 workerNode.tasksQueueBackPressureSize = size
654 }
655 }
656
657 private buildTasksQueueOptions (
658 tasksQueueOptions: TasksQueueOptions
659 ): TasksQueueOptions {
660 return {
661 ...{
662 size: Math.pow(this.maxSize, 2),
663 concurrency: 1,
664 taskStealing: true,
665 tasksStealingOnBackPressure: true
666 },
667 ...tasksQueueOptions
668 }
669 }
670
671 /**
672 * Whether the pool is full or not.
673 *
674 * The pool filling boolean status.
675 */
676 protected get full (): boolean {
677 return this.workerNodes.length >= this.maxSize
678 }
679
680 /**
681 * Whether the pool is busy or not.
682 *
683 * The pool busyness boolean status.
684 */
685 protected abstract get busy (): boolean
686
687 /**
688 * Whether worker nodes are executing concurrently their tasks quota or not.
689 *
690 * @returns Worker nodes busyness boolean status.
691 */
692 protected internalBusy (): boolean {
693 if (this.opts.enableTasksQueue === true) {
694 return (
695 this.workerNodes.findIndex(
696 workerNode =>
697 workerNode.info.ready &&
698 workerNode.usage.tasks.executing <
699 (this.opts.tasksQueueOptions?.concurrency as number)
700 ) === -1
701 )
702 } else {
703 return (
704 this.workerNodes.findIndex(
705 workerNode =>
706 workerNode.info.ready && workerNode.usage.tasks.executing === 0
707 ) === -1
708 )
709 }
710 }
711
712 /** @inheritDoc */
713 public listTaskFunctions (): string[] {
714 for (const workerNode of this.workerNodes) {
715 if (
716 Array.isArray(workerNode.info.taskFunctions) &&
717 workerNode.info.taskFunctions.length > 0
718 ) {
719 return workerNode.info.taskFunctions
720 }
721 }
722 return []
723 }
724
725 private shallExecuteTask (workerNodeKey: number): boolean {
726 return (
727 this.tasksQueueSize(workerNodeKey) === 0 &&
728 this.workerNodes[workerNodeKey].usage.tasks.executing <
729 (this.opts.tasksQueueOptions?.concurrency as number)
730 )
731 }
732
733 /** @inheritDoc */
734 public async execute (
735 data?: Data,
736 name?: string,
737 transferList?: TransferListItem[]
738 ): Promise<Response> {
739 return await new Promise<Response>((resolve, reject) => {
740 if (!this.started) {
741 reject(new Error('Cannot execute a task on not started pool'))
742 return
743 }
744 if (name != null && typeof name !== 'string') {
745 reject(new TypeError('name argument must be a string'))
746 return
747 }
748 if (
749 name != null &&
750 typeof name === 'string' &&
751 name.trim().length === 0
752 ) {
753 reject(new TypeError('name argument must not be an empty string'))
754 return
755 }
756 if (transferList != null && !Array.isArray(transferList)) {
757 reject(new TypeError('transferList argument must be an array'))
758 return
759 }
760 const timestamp = performance.now()
761 const workerNodeKey = this.chooseWorkerNode()
762 const task: Task<Data> = {
763 name: name ?? DEFAULT_TASK_NAME,
764 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
765 data: data ?? ({} as Data),
766 transferList,
767 timestamp,
768 workerId: this.getWorkerInfo(workerNodeKey).id as number,
769 taskId: randomUUID()
770 }
771 this.promiseResponseMap.set(task.taskId as string, {
772 resolve,
773 reject,
774 workerNodeKey
775 })
776 if (
777 this.opts.enableTasksQueue === false ||
778 (this.opts.enableTasksQueue === true &&
779 this.shallExecuteTask(workerNodeKey))
780 ) {
781 this.executeTask(workerNodeKey, task)
782 } else {
783 this.enqueueTask(workerNodeKey, task)
784 }
785 })
786 }
787
788 /** @inheritdoc */
789 public start (): void {
790 this.starting = true
791 while (
792 this.workerNodes.reduce(
793 (accumulator, workerNode) =>
794 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
795 0
796 ) < this.numberOfWorkers
797 ) {
798 this.createAndSetupWorkerNode()
799 }
800 this.starting = false
801 this.started = true
802 }
803
804 /** @inheritDoc */
805 public async destroy (): Promise<void> {
806 await Promise.all(
807 this.workerNodes.map(async (_, workerNodeKey) => {
808 await this.destroyWorkerNode(workerNodeKey)
809 })
810 )
811 this.emitter?.emit(PoolEvents.destroy, this.info)
812 this.started = false
813 }
814
815 protected async sendKillMessageToWorker (
816 workerNodeKey: number,
817 workerId: number
818 ): Promise<void> {
819 await new Promise<void>((resolve, reject) => {
820 this.registerWorkerMessageListener(workerNodeKey, message => {
821 if (message.kill === 'success') {
822 resolve()
823 } else if (message.kill === 'failure') {
824 reject(new Error(`Worker ${workerId} kill message handling failed`))
825 }
826 })
827 this.sendToWorker(workerNodeKey, { kill: true, workerId })
828 })
829 }
830
831 /**
832 * Terminates the worker node given its worker node key.
833 *
834 * @param workerNodeKey - The worker node key.
835 */
836 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
837
838 /**
839 * Setup hook to execute code before worker nodes are created in the abstract constructor.
840 * Can be overridden.
841 *
842 * @virtual
843 */
844 protected setupHook (): void {
845 /* Intentionally empty */
846 }
847
848 /**
849 * Should return whether the worker is the main worker or not.
850 */
851 protected abstract isMain (): boolean
852
853 /**
854 * Hook executed before the worker task execution.
855 * Can be overridden.
856 *
857 * @param workerNodeKey - The worker node key.
858 * @param task - The task to execute.
859 */
860 protected beforeTaskExecutionHook (
861 workerNodeKey: number,
862 task: Task<Data>
863 ): void {
864 if (this.workerNodes[workerNodeKey]?.usage != null) {
865 const workerUsage = this.workerNodes[workerNodeKey].usage
866 ++workerUsage.tasks.executing
867 this.updateWaitTimeWorkerUsage(workerUsage, task)
868 }
869 if (
870 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
871 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
872 task.name as string
873 ) != null
874 ) {
875 const taskFunctionWorkerUsage = this.workerNodes[
876 workerNodeKey
877 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
878 ++taskFunctionWorkerUsage.tasks.executing
879 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
880 }
881 }
882
883 /**
884 * Hook executed after the worker task execution.
885 * Can be overridden.
886 *
887 * @param workerNodeKey - The worker node key.
888 * @param message - The received message.
889 */
890 protected afterTaskExecutionHook (
891 workerNodeKey: number,
892 message: MessageValue<Response>
893 ): void {
894 if (this.workerNodes[workerNodeKey]?.usage != null) {
895 const workerUsage = this.workerNodes[workerNodeKey].usage
896 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
897 this.updateRunTimeWorkerUsage(workerUsage, message)
898 this.updateEluWorkerUsage(workerUsage, message)
899 }
900 if (
901 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
902 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
903 message.taskPerformance?.name as string
904 ) != null
905 ) {
906 const taskFunctionWorkerUsage = this.workerNodes[
907 workerNodeKey
908 ].getTaskFunctionWorkerUsage(
909 message.taskPerformance?.name as string
910 ) as WorkerUsage
911 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
912 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
913 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
914 }
915 }
916
917 /**
918 * Whether the worker node shall update its task function worker usage or not.
919 *
920 * @param workerNodeKey - The worker node key.
921 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
922 */
923 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
924 const workerInfo = this.getWorkerInfo(workerNodeKey)
925 return (
926 workerInfo != null &&
927 Array.isArray(workerInfo.taskFunctions) &&
928 workerInfo.taskFunctions.length > 2
929 )
930 }
931
932 private updateTaskStatisticsWorkerUsage (
933 workerUsage: WorkerUsage,
934 message: MessageValue<Response>
935 ): void {
936 const workerTaskStatistics = workerUsage.tasks
937 if (
938 workerTaskStatistics.executing != null &&
939 workerTaskStatistics.executing > 0
940 ) {
941 --workerTaskStatistics.executing
942 }
943 if (message.taskError == null) {
944 ++workerTaskStatistics.executed
945 } else {
946 ++workerTaskStatistics.failed
947 }
948 }
949
950 private updateRunTimeWorkerUsage (
951 workerUsage: WorkerUsage,
952 message: MessageValue<Response>
953 ): void {
954 if (message.taskError != null) {
955 return
956 }
957 updateMeasurementStatistics(
958 workerUsage.runTime,
959 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
960 message.taskPerformance?.runTime ?? 0
961 )
962 }
963
964 private updateWaitTimeWorkerUsage (
965 workerUsage: WorkerUsage,
966 task: Task<Data>
967 ): void {
968 const timestamp = performance.now()
969 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
970 updateMeasurementStatistics(
971 workerUsage.waitTime,
972 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
973 taskWaitTime
974 )
975 }
976
977 private updateEluWorkerUsage (
978 workerUsage: WorkerUsage,
979 message: MessageValue<Response>
980 ): void {
981 if (message.taskError != null) {
982 return
983 }
984 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
985 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
986 updateMeasurementStatistics(
987 workerUsage.elu.active,
988 eluTaskStatisticsRequirements,
989 message.taskPerformance?.elu?.active ?? 0
990 )
991 updateMeasurementStatistics(
992 workerUsage.elu.idle,
993 eluTaskStatisticsRequirements,
994 message.taskPerformance?.elu?.idle ?? 0
995 )
996 if (eluTaskStatisticsRequirements.aggregate) {
997 if (message.taskPerformance?.elu != null) {
998 if (workerUsage.elu.utilization != null) {
999 workerUsage.elu.utilization =
1000 (workerUsage.elu.utilization +
1001 message.taskPerformance.elu.utilization) /
1002 2
1003 } else {
1004 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1005 }
1006 }
1007 }
1008 }
1009
1010 /**
1011 * Chooses a worker node for the next task.
1012 *
1013 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1014 *
1015 * @returns The chosen worker node key
1016 */
1017 private chooseWorkerNode (): number {
1018 if (this.shallCreateDynamicWorker()) {
1019 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1020 if (
1021 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1022 ) {
1023 return workerNodeKey
1024 }
1025 }
1026 return this.workerChoiceStrategyContext.execute()
1027 }
1028
1029 /**
1030 * Conditions for dynamic worker creation.
1031 *
1032 * @returns Whether to create a dynamic worker or not.
1033 */
1034 private shallCreateDynamicWorker (): boolean {
1035 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1036 }
1037
1038 /**
1039 * Sends a message to worker given its worker node key.
1040 *
1041 * @param workerNodeKey - The worker node key.
1042 * @param message - The message.
1043 * @param transferList - The optional array of transferable objects.
1044 */
1045 protected abstract sendToWorker (
1046 workerNodeKey: number,
1047 message: MessageValue<Data>,
1048 transferList?: TransferListItem[]
1049 ): void
1050
1051 /**
1052 * Creates a new worker.
1053 *
1054 * @returns Newly created worker.
1055 */
1056 protected abstract createWorker (): Worker
1057
1058 /**
1059 * Creates a new, completely set up worker node.
1060 *
1061 * @returns New, completely set up worker node key.
1062 */
1063 protected createAndSetupWorkerNode (): number {
1064 const worker = this.createWorker()
1065
1066 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1067 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1068 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1069 worker.on('error', error => {
1070 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1071 const workerInfo = this.getWorkerInfo(workerNodeKey)
1072 workerInfo.ready = false
1073 this.workerNodes[workerNodeKey].closeChannel()
1074 this.emitter?.emit(PoolEvents.error, error)
1075 if (
1076 this.opts.restartWorkerOnError === true &&
1077 this.started &&
1078 !this.starting
1079 ) {
1080 if (workerInfo.dynamic) {
1081 this.createAndSetupDynamicWorkerNode()
1082 } else {
1083 this.createAndSetupWorkerNode()
1084 }
1085 }
1086 if (this.opts.enableTasksQueue === true) {
1087 this.redistributeQueuedTasks(workerNodeKey)
1088 }
1089 })
1090 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1091 worker.once('exit', () => {
1092 this.removeWorkerNode(worker)
1093 })
1094
1095 const workerNodeKey = this.addWorkerNode(worker)
1096
1097 this.afterWorkerNodeSetup(workerNodeKey)
1098
1099 return workerNodeKey
1100 }
1101
1102 /**
1103 * Creates a new, completely set up dynamic worker node.
1104 *
1105 * @returns New, completely set up dynamic worker node key.
1106 */
1107 protected createAndSetupDynamicWorkerNode (): number {
1108 const workerNodeKey = this.createAndSetupWorkerNode()
1109 this.registerWorkerMessageListener(workerNodeKey, message => {
1110 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1111 message.workerId
1112 )
1113 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1114 // Kill message received from worker
1115 if (
1116 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1117 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1118 ((this.opts.enableTasksQueue === false &&
1119 workerUsage.tasks.executing === 0) ||
1120 (this.opts.enableTasksQueue === true &&
1121 workerUsage.tasks.executing === 0 &&
1122 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1123 ) {
1124 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1125 this.emitter?.emit(PoolEvents.error, error)
1126 })
1127 }
1128 })
1129 const workerInfo = this.getWorkerInfo(workerNodeKey)
1130 this.sendToWorker(workerNodeKey, {
1131 checkActive: true,
1132 workerId: workerInfo.id as number
1133 })
1134 workerInfo.dynamic = true
1135 if (
1136 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1137 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1138 ) {
1139 workerInfo.ready = true
1140 }
1141 this.checkAndEmitDynamicWorkerCreationEvents()
1142 return workerNodeKey
1143 }
1144
1145 /**
1146 * Registers a listener callback on the worker given its worker node key.
1147 *
1148 * @param workerNodeKey - The worker node key.
1149 * @param listener - The message listener callback.
1150 */
1151 protected abstract registerWorkerMessageListener<
1152 Message extends Data | Response
1153 >(
1154 workerNodeKey: number,
1155 listener: (message: MessageValue<Message>) => void
1156 ): void
1157
1158 /**
1159 * Method hooked up after a worker node has been newly created.
1160 * Can be overridden.
1161 *
1162 * @param workerNodeKey - The newly created worker node key.
1163 */
1164 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1165 // Listen to worker messages.
1166 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1167 // Send the startup message to worker.
1168 this.sendStartupMessageToWorker(workerNodeKey)
1169 // Send the statistics message to worker.
1170 this.sendStatisticsMessageToWorker(workerNodeKey)
1171 if (this.opts.enableTasksQueue === true) {
1172 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1173 this.workerNodes[workerNodeKey].onEmptyQueue =
1174 this.taskStealingOnEmptyQueue.bind(this)
1175 }
1176 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1177 this.workerNodes[workerNodeKey].onBackPressure =
1178 this.tasksStealingOnBackPressure.bind(this)
1179 }
1180 }
1181 }
1182
1183 /**
1184 * Sends the startup message to worker given its worker node key.
1185 *
1186 * @param workerNodeKey - The worker node key.
1187 */
1188 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1189
1190 /**
1191 * Sends the statistics message to worker given its worker node key.
1192 *
1193 * @param workerNodeKey - The worker node key.
1194 */
1195 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1196 this.sendToWorker(workerNodeKey, {
1197 statistics: {
1198 runTime:
1199 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1200 .runTime.aggregate,
1201 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1202 .elu.aggregate
1203 },
1204 workerId: this.getWorkerInfo(workerNodeKey).id as number
1205 })
1206 }
1207
1208 private redistributeQueuedTasks (workerNodeKey: number): void {
1209 while (this.tasksQueueSize(workerNodeKey) > 0) {
1210 const destinationWorkerNodeKey = this.workerNodes.reduce(
1211 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1212 return workerNode.info.ready &&
1213 workerNode.usage.tasks.queued <
1214 workerNodes[minWorkerNodeKey].usage.tasks.queued
1215 ? workerNodeKey
1216 : minWorkerNodeKey
1217 },
1218 0
1219 )
1220 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1221 const task = {
1222 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1223 workerId: destinationWorkerNode.info.id as number
1224 }
1225 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1226 this.executeTask(destinationWorkerNodeKey, task)
1227 } else {
1228 this.enqueueTask(destinationWorkerNodeKey, task)
1229 }
1230 }
1231 }
1232
1233 private updateTaskStolenStatisticsWorkerUsage (
1234 workerNodeKey: number,
1235 taskName: string
1236 ): void {
1237 const workerNode = this.workerNodes[workerNodeKey]
1238 if (workerNode?.usage != null) {
1239 ++workerNode.usage.tasks.stolen
1240 }
1241 if (
1242 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1243 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1244 ) {
1245 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1246 taskName
1247 ) as WorkerUsage
1248 ++taskFunctionWorkerUsage.tasks.stolen
1249 }
1250 }
1251
1252 private taskStealingOnEmptyQueue (workerId: number): void {
1253 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1254 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1255 const workerNodes = this.workerNodes
1256 .slice()
1257 .sort(
1258 (workerNodeA, workerNodeB) =>
1259 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1260 )
1261 const sourceWorkerNode = workerNodes.find(
1262 workerNode =>
1263 workerNode.info.ready &&
1264 workerNode.info.id !== workerId &&
1265 workerNode.usage.tasks.queued > 0
1266 )
1267 if (sourceWorkerNode != null) {
1268 const task = {
1269 ...(sourceWorkerNode.popTask() as Task<Data>),
1270 workerId: destinationWorkerNode.info.id as number
1271 }
1272 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1273 this.executeTask(destinationWorkerNodeKey, task)
1274 } else {
1275 this.enqueueTask(destinationWorkerNodeKey, task)
1276 }
1277 this.updateTaskStolenStatisticsWorkerUsage(
1278 destinationWorkerNodeKey,
1279 task.name as string
1280 )
1281 }
1282 }
1283
1284 private tasksStealingOnBackPressure (workerId: number): void {
1285 const sizeOffset = 1
1286 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1287 return
1288 }
1289 const sourceWorkerNode =
1290 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1291 const workerNodes = this.workerNodes
1292 .slice()
1293 .sort(
1294 (workerNodeA, workerNodeB) =>
1295 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1296 )
1297 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1298 if (
1299 sourceWorkerNode.usage.tasks.queued > 0 &&
1300 workerNode.info.ready &&
1301 workerNode.info.id !== workerId &&
1302 workerNode.usage.tasks.queued <
1303 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1304 ) {
1305 const task = {
1306 ...(sourceWorkerNode.popTask() as Task<Data>),
1307 workerId: workerNode.info.id as number
1308 }
1309 if (this.shallExecuteTask(workerNodeKey)) {
1310 this.executeTask(workerNodeKey, task)
1311 } else {
1312 this.enqueueTask(workerNodeKey, task)
1313 }
1314 this.updateTaskStolenStatisticsWorkerUsage(
1315 workerNodeKey,
1316 task.name as string
1317 )
1318 }
1319 }
1320 }
1321
1322 /**
1323 * This method is the listener registered for each worker message.
1324 *
1325 * @returns The listener function to execute when a message is received from a worker.
1326 */
1327 protected workerListener (): (message: MessageValue<Response>) => void {
1328 return message => {
1329 this.checkMessageWorkerId(message)
1330 if (message.ready != null && message.taskFunctions != null) {
1331 // Worker ready response received from worker
1332 this.handleWorkerReadyResponse(message)
1333 } else if (message.taskId != null) {
1334 // Task execution response received from worker
1335 this.handleTaskExecutionResponse(message)
1336 } else if (message.taskFunctions != null) {
1337 // Task functions message received from worker
1338 this.getWorkerInfo(
1339 this.getWorkerNodeKeyByWorkerId(message.workerId)
1340 ).taskFunctions = message.taskFunctions
1341 }
1342 }
1343 }
1344
1345 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1346 if (message.ready === false) {
1347 throw new Error(`Worker ${message.workerId} failed to initialize`)
1348 }
1349 const workerInfo = this.getWorkerInfo(
1350 this.getWorkerNodeKeyByWorkerId(message.workerId)
1351 )
1352 workerInfo.ready = message.ready as boolean
1353 workerInfo.taskFunctions = message.taskFunctions
1354 if (this.emitter != null && this.ready) {
1355 this.emitter.emit(PoolEvents.ready, this.info)
1356 }
1357 }
1358
1359 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1360 const { taskId, taskError, data } = message
1361 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1362 if (promiseResponse != null) {
1363 if (taskError != null) {
1364 this.emitter?.emit(PoolEvents.taskError, taskError)
1365 promiseResponse.reject(taskError.message)
1366 } else {
1367 promiseResponse.resolve(data as Response)
1368 }
1369 const workerNodeKey = promiseResponse.workerNodeKey
1370 this.afterTaskExecutionHook(workerNodeKey, message)
1371 this.workerChoiceStrategyContext.update(workerNodeKey)
1372 this.promiseResponseMap.delete(taskId as string)
1373 if (
1374 this.opts.enableTasksQueue === true &&
1375 this.tasksQueueSize(workerNodeKey) > 0 &&
1376 this.workerNodes[workerNodeKey].usage.tasks.executing <
1377 (this.opts.tasksQueueOptions?.concurrency as number)
1378 ) {
1379 this.executeTask(
1380 workerNodeKey,
1381 this.dequeueTask(workerNodeKey) as Task<Data>
1382 )
1383 }
1384 }
1385 }
1386
1387 private checkAndEmitTaskExecutionEvents (): void {
1388 if (this.busy) {
1389 this.emitter?.emit(PoolEvents.busy, this.info)
1390 }
1391 }
1392
1393 private checkAndEmitTaskQueuingEvents (): void {
1394 if (this.hasBackPressure()) {
1395 this.emitter?.emit(PoolEvents.backPressure, this.info)
1396 }
1397 }
1398
1399 private checkAndEmitDynamicWorkerCreationEvents (): void {
1400 if (this.type === PoolTypes.dynamic) {
1401 if (this.full) {
1402 this.emitter?.emit(PoolEvents.full, this.info)
1403 }
1404 }
1405 }
1406
1407 /**
1408 * Gets the worker information given its worker node key.
1409 *
1410 * @param workerNodeKey - The worker node key.
1411 * @returns The worker information.
1412 */
1413 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1414 return this.workerNodes[workerNodeKey].info
1415 }
1416
1417 /**
1418 * Adds the given worker in the pool worker nodes.
1419 *
1420 * @param worker - The worker.
1421 * @returns The added worker node key.
1422 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1423 */
1424 private addWorkerNode (worker: Worker): number {
1425 const workerNode = new WorkerNode<Worker, Data>(
1426 worker,
1427 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1428 )
1429 // Flag the worker node as ready at pool startup.
1430 if (this.starting) {
1431 workerNode.info.ready = true
1432 }
1433 this.workerNodes.push(workerNode)
1434 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1435 if (workerNodeKey === -1) {
1436 throw new Error('Worker added not found in worker nodes')
1437 }
1438 return workerNodeKey
1439 }
1440
1441 /**
1442 * Removes the given worker from the pool worker nodes.
1443 *
1444 * @param worker - The worker.
1445 */
1446 private removeWorkerNode (worker: Worker): void {
1447 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1448 if (workerNodeKey !== -1) {
1449 this.workerNodes.splice(workerNodeKey, 1)
1450 this.workerChoiceStrategyContext.remove(workerNodeKey)
1451 }
1452 }
1453
1454 /** @inheritDoc */
1455 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1456 return (
1457 this.opts.enableTasksQueue === true &&
1458 this.workerNodes[workerNodeKey].hasBackPressure()
1459 )
1460 }
1461
1462 private hasBackPressure (): boolean {
1463 return (
1464 this.opts.enableTasksQueue === true &&
1465 this.workerNodes.findIndex(
1466 workerNode => !workerNode.hasBackPressure()
1467 ) === -1
1468 )
1469 }
1470
1471 /**
1472 * Executes the given task on the worker given its worker node key.
1473 *
1474 * @param workerNodeKey - The worker node key.
1475 * @param task - The task to execute.
1476 */
1477 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1478 this.beforeTaskExecutionHook(workerNodeKey, task)
1479 this.sendToWorker(workerNodeKey, task, task.transferList)
1480 this.checkAndEmitTaskExecutionEvents()
1481 }
1482
1483 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1484 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1485 this.checkAndEmitTaskQueuingEvents()
1486 return tasksQueueSize
1487 }
1488
1489 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1490 return this.workerNodes[workerNodeKey].dequeueTask()
1491 }
1492
1493 private tasksQueueSize (workerNodeKey: number): number {
1494 return this.workerNodes[workerNodeKey].tasksQueueSize()
1495 }
1496
1497 protected flushTasksQueue (workerNodeKey: number): void {
1498 while (this.tasksQueueSize(workerNodeKey) > 0) {
1499 this.executeTask(
1500 workerNodeKey,
1501 this.dequeueTask(workerNodeKey) as Task<Data>
1502 )
1503 }
1504 this.workerNodes[workerNodeKey].clearTasksQueue()
1505 }
1506
1507 private flushTasksQueues (): void {
1508 for (const [workerNodeKey] of this.workerNodes.entries()) {
1509 this.flushTasksQueue(workerNodeKey)
1510 }
1511 }
1512 }