Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/fastify-worker_thr...
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task,
9 Writable
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 average,
16 isKillBehavior,
17 isPlainObject,
18 median,
19 round,
20 updateMeasurementStatistics
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50
51 /**
52 * Base class that implements some shared logic for all poolifier pools.
53 *
54 * @typeParam Worker - Type of worker which manages this pool.
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
57 */
58 export abstract class AbstractPool<
59 Worker extends IWorker,
60 Data = unknown,
61 Response = unknown
62 > implements IPool<Worker, Data, Response> {
63 /** @inheritDoc */
64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
65
66 /** @inheritDoc */
67 public readonly emitter?: PoolEmitter
68
69 /**
70 * The task execution response promise map.
71 *
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is starting or not.
96 */
97 private readonly starting: boolean
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.starting = true
148 this.startPool()
149 this.starting = false
150 this.started = true
151
152 this.startTimestamp = performance.now()
153 }
154
155 private checkFilePath (filePath: string): void {
156 if (
157 filePath == null ||
158 typeof filePath !== 'string' ||
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
161 throw new Error('Please specify a file with a worker implementation')
162 }
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
166 }
167
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
174 throw new TypeError(
175 'Cannot instantiate a pool with a non safe integer number of workers'
176 )
177 } else if (numberOfWorkers < 0) {
178 throw new RangeError(
179 'Cannot instantiate a pool with a negative number of workers'
180 )
181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
187 if (this.type === PoolTypes.dynamic) {
188 if (max == null) {
189 throw new TypeError(
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
200 } else if (max === 0) {
201 throw new RangeError(
202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
244 throw new Error(
245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
246 )
247 }
248 }
249
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
274 if (
275 workerChoiceStrategyOptions.weights != null &&
276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
292 }
293
294 private checkValidTasksQueueOptions (
295 tasksQueueOptions: Writable<TasksQueueOptions>
296 ): void {
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
305 'Invalid worker node tasks concurrency: must be an integer'
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
312 throw new RangeError(
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
318 tasksQueueOptions?.size != null
319 ) {
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
322 )
323 }
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
327 if (
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
330 ) {
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
336 throw new RangeError(
337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
338 )
339 }
340 }
341
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
350 this.createAndSetupWorkerNode()
351 }
352 }
353
354 /** @inheritDoc */
355 public get info (): PoolInfo {
356 return {
357 version,
358 type: this.type,
359 worker: this.worker,
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
362 minSize: this.minSize,
363 maxSize: this.maxSize,
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 workerNode.usage.tasks.executing === 0
372 ? accumulator + 1
373 : accumulator,
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
379 0
380 ),
381 executedTasks: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + workerNode.usage.tasks.executed,
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.executing,
389 0
390 ),
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + workerNode.usage.tasks.failed,
411 0
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
420 )
421 )
422 ),
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
427 )
428 )
429 ),
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
439 )
440 )
441 }),
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.median && {
444 median: round(
445 median(
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
450 )
451 )
452 )
453 })
454 }
455 }),
456 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
457 .waitTime.aggregate && {
458 waitTime: {
459 minimum: round(
460 Math.min(
461 ...this.workerNodes.map(
462 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
463 )
464 )
465 ),
466 maximum: round(
467 Math.max(
468 ...this.workerNodes.map(
469 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
470 )
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.average && {
475 average: round(
476 average(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
482 )
483 )
484 }),
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.median && {
487 median: round(
488 median(
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
493 )
494 )
495 )
496 })
497 }
498 })
499 }
500 }
501
502 /**
503 * The pool readiness boolean status.
504 */
505 private get ready (): boolean {
506 return (
507 this.workerNodes.reduce(
508 (accumulator, workerNode) =>
509 !workerNode.info.dynamic && workerNode.info.ready
510 ? accumulator + 1
511 : accumulator,
512 0
513 ) >= this.minSize
514 )
515 }
516
517 /**
518 * The approximate pool utilization.
519 *
520 * @returns The pool utilization.
521 */
522 private get utilization (): number {
523 const poolTimeCapacity =
524 (performance.now() - this.startTimestamp) * this.maxSize
525 const totalTasksRunTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
527 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
528 0
529 )
530 const totalTasksWaitTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
532 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
533 0
534 )
535 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
536 }
537
538 /**
539 * The pool type.
540 *
541 * If it is `'dynamic'`, it provides the `max` property.
542 */
543 protected abstract get type (): PoolType
544
545 /**
546 * The worker type.
547 */
548 protected abstract get worker (): WorkerType
549
550 /**
551 * The pool minimum size.
552 */
553 protected get minSize (): number {
554 return this.numberOfWorkers
555 }
556
557 /**
558 * The pool maximum size.
559 */
560 protected get maxSize (): number {
561 return this.max ?? this.numberOfWorkers
562 }
563
564 /**
565 * Checks if the worker id sent in the received message from a worker is valid.
566 *
567 * @param message - The received message.
568 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
569 */
570 private checkMessageWorkerId (message: MessageValue<Response>): void {
571 if (message.workerId == null) {
572 throw new Error('Worker message received without worker id')
573 } else if (
574 message.workerId != null &&
575 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
576 ) {
577 throw new Error(
578 `Worker message received from unknown worker '${message.workerId}'`
579 )
580 }
581 }
582
583 /**
584 * Gets the given worker its worker node key.
585 *
586 * @param worker - The worker.
587 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
588 */
589 private getWorkerNodeKeyByWorker (worker: Worker): number {
590 return this.workerNodes.findIndex(
591 (workerNode) => workerNode.worker === worker
592 )
593 }
594
595 /**
596 * Gets the worker node key given its worker id.
597 *
598 * @param workerId - The worker id.
599 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
600 */
601 private getWorkerNodeKeyByWorkerId (workerId: number): number {
602 return this.workerNodes.findIndex(
603 (workerNode) => workerNode.info.id === workerId
604 )
605 }
606
607 /** @inheritDoc */
608 public setWorkerChoiceStrategy (
609 workerChoiceStrategy: WorkerChoiceStrategy,
610 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
611 ): void {
612 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
613 this.opts.workerChoiceStrategy = workerChoiceStrategy
614 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
615 this.opts.workerChoiceStrategy
616 )
617 if (workerChoiceStrategyOptions != null) {
618 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 }
620 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
621 workerNode.resetUsage()
622 this.sendStatisticsMessageToWorker(workerNodeKey)
623 }
624 }
625
626 /** @inheritDoc */
627 public setWorkerChoiceStrategyOptions (
628 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
629 ): void {
630 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
631 this.opts.workerChoiceStrategyOptions = {
632 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
633 ...workerChoiceStrategyOptions
634 }
635 this.workerChoiceStrategyContext.setOptions(
636 this.opts.workerChoiceStrategyOptions
637 )
638 }
639
640 /** @inheritDoc */
641 public enableTasksQueue (
642 enable: boolean,
643 tasksQueueOptions?: TasksQueueOptions
644 ): void {
645 if (this.opts.enableTasksQueue === true && !enable) {
646 this.flushTasksQueues()
647 }
648 this.opts.enableTasksQueue = enable
649 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
650 }
651
652 /** @inheritDoc */
653 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
654 if (this.opts.enableTasksQueue === true) {
655 this.checkValidTasksQueueOptions(tasksQueueOptions)
656 this.opts.tasksQueueOptions =
657 this.buildTasksQueueOptions(tasksQueueOptions)
658 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
659 } else if (this.opts.tasksQueueOptions != null) {
660 delete this.opts.tasksQueueOptions
661 }
662 }
663
664 private setTasksQueueMaxSize (size: number): void {
665 for (const workerNode of this.workerNodes) {
666 workerNode.tasksQueueBackPressureSize = size
667 }
668 }
669
670 private buildTasksQueueOptions (
671 tasksQueueOptions: TasksQueueOptions
672 ): TasksQueueOptions {
673 return {
674 ...{
675 size: Math.pow(this.maxSize, 2),
676 concurrency: 1
677 },
678 ...tasksQueueOptions
679 }
680 }
681
682 /**
683 * Whether the pool is full or not.
684 *
685 * The pool filling boolean status.
686 */
687 protected get full (): boolean {
688 return this.workerNodes.length >= this.maxSize
689 }
690
691 /**
692 * Whether the pool is busy or not.
693 *
694 * The pool busyness boolean status.
695 */
696 protected abstract get busy (): boolean
697
698 /**
699 * Whether worker nodes are executing concurrently their tasks quota or not.
700 *
701 * @returns Worker nodes busyness boolean status.
702 */
703 protected internalBusy (): boolean {
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes.findIndex(
707 (workerNode) =>
708 workerNode.info.ready &&
709 workerNode.usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number)
711 ) === -1
712 )
713 } else {
714 return (
715 this.workerNodes.findIndex(
716 (workerNode) =>
717 workerNode.info.ready && workerNode.usage.tasks.executing === 0
718 ) === -1
719 )
720 }
721 }
722
723 /** @inheritDoc */
724 public listTaskFunctions (): string[] {
725 for (const workerNode of this.workerNodes) {
726 if (
727 Array.isArray(workerNode.info.taskFunctions) &&
728 workerNode.info.taskFunctions.length > 0
729 ) {
730 return workerNode.info.taskFunctions
731 }
732 }
733 return []
734 }
735
736 /** @inheritDoc */
737 public async execute (
738 data?: Data,
739 name?: string,
740 transferList?: TransferListItem[]
741 ): Promise<Response> {
742 return await new Promise<Response>((resolve, reject) => {
743 if (!this.started) {
744 reject(new Error('Cannot execute a task on destroyed pool'))
745 }
746 if (name != null && typeof name !== 'string') {
747 reject(new TypeError('name argument must be a string'))
748 }
749 if (
750 name != null &&
751 typeof name === 'string' &&
752 name.trim().length === 0
753 ) {
754 reject(new TypeError('name argument must not be an empty string'))
755 }
756 if (transferList != null && !Array.isArray(transferList)) {
757 reject(new TypeError('transferList argument must be an array'))
758 }
759 const timestamp = performance.now()
760 const workerNodeKey = this.chooseWorkerNode()
761 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
762 if (
763 name != null &&
764 Array.isArray(workerInfo.taskFunctions) &&
765 !workerInfo.taskFunctions.includes(name)
766 ) {
767 reject(
768 new Error(`Task function '${name}' is not registered in the pool`)
769 )
770 }
771 const task: Task<Data> = {
772 name: name ?? DEFAULT_TASK_NAME,
773 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
774 data: data ?? ({} as Data),
775 transferList,
776 timestamp,
777 workerId: workerInfo.id as number,
778 taskId: randomUUID()
779 }
780 this.promiseResponseMap.set(task.taskId as string, {
781 resolve,
782 reject,
783 workerNodeKey
784 })
785 if (
786 this.opts.enableTasksQueue === false ||
787 (this.opts.enableTasksQueue === true &&
788 this.tasksQueueSize(workerNodeKey) === 0 &&
789 this.workerNodes[workerNodeKey].usage.tasks.executing <
790 (this.opts.tasksQueueOptions?.concurrency as number))
791 ) {
792 this.executeTask(workerNodeKey, task)
793 } else {
794 this.enqueueTask(workerNodeKey, task)
795 }
796 })
797 }
798
799 /** @inheritDoc */
800 public async destroy (): Promise<void> {
801 await Promise.all(
802 this.workerNodes.map(async (_, workerNodeKey) => {
803 await this.destroyWorkerNode(workerNodeKey)
804 })
805 )
806 this.emitter?.emit(PoolEvents.destroy, this.info)
807 this.started = false
808 }
809
810 protected async sendKillMessageToWorker (
811 workerNodeKey: number,
812 workerId: number
813 ): Promise<void> {
814 await new Promise<void>((resolve, reject) => {
815 this.registerWorkerMessageListener(workerNodeKey, (message) => {
816 if (message.kill === 'success') {
817 resolve()
818 } else if (message.kill === 'failure') {
819 reject(new Error(`Worker ${workerId} kill message handling failed`))
820 }
821 })
822 this.sendToWorker(workerNodeKey, { kill: true, workerId })
823 })
824 }
825
826 /**
827 * Terminates the worker node given its worker node key.
828 *
829 * @param workerNodeKey - The worker node key.
830 */
831 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
832
833 /**
834 * Setup hook to execute code before worker nodes are created in the abstract constructor.
835 * Can be overridden.
836 *
837 * @virtual
838 */
839 protected setupHook (): void {
840 /** Intentionally empty */
841 }
842
843 /**
844 * Should return whether the worker is the main worker or not.
845 */
846 protected abstract isMain (): boolean
847
848 /**
849 * Hook executed before the worker task execution.
850 * Can be overridden.
851 *
852 * @param workerNodeKey - The worker node key.
853 * @param task - The task to execute.
854 */
855 protected beforeTaskExecutionHook (
856 workerNodeKey: number,
857 task: Task<Data>
858 ): void {
859 if (this.workerNodes[workerNodeKey]?.usage != null) {
860 const workerUsage = this.workerNodes[workerNodeKey].usage
861 ++workerUsage.tasks.executing
862 this.updateWaitTimeWorkerUsage(workerUsage, task)
863 }
864 if (
865 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
866 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
867 task.name as string
868 ) != null
869 ) {
870 const taskFunctionWorkerUsage = this.workerNodes[
871 workerNodeKey
872 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
873 ++taskFunctionWorkerUsage.tasks.executing
874 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
875 }
876 }
877
878 /**
879 * Hook executed after the worker task execution.
880 * Can be overridden.
881 *
882 * @param workerNodeKey - The worker node key.
883 * @param message - The received message.
884 */
885 protected afterTaskExecutionHook (
886 workerNodeKey: number,
887 message: MessageValue<Response>
888 ): void {
889 if (this.workerNodes[workerNodeKey]?.usage != null) {
890 const workerUsage = this.workerNodes[workerNodeKey].usage
891 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
892 this.updateRunTimeWorkerUsage(workerUsage, message)
893 this.updateEluWorkerUsage(workerUsage, message)
894 }
895 if (
896 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
897 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
898 message.taskPerformance?.name as string
899 ) != null
900 ) {
901 const taskFunctionWorkerUsage = this.workerNodes[
902 workerNodeKey
903 ].getTaskFunctionWorkerUsage(
904 message.taskPerformance?.name as string
905 ) as WorkerUsage
906 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
907 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
908 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
909 }
910 }
911
912 /**
913 * Whether the worker node shall update its task function worker usage or not.
914 *
915 * @param workerNodeKey - The worker node key.
916 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
917 */
918 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
919 const workerInfo = this.getWorkerInfo(workerNodeKey)
920 return (
921 workerInfo != null &&
922 Array.isArray(workerInfo.taskFunctions) &&
923 workerInfo.taskFunctions.length > 2
924 )
925 }
926
927 private updateTaskStatisticsWorkerUsage (
928 workerUsage: WorkerUsage,
929 message: MessageValue<Response>
930 ): void {
931 const workerTaskStatistics = workerUsage.tasks
932 if (
933 workerTaskStatistics.executing != null &&
934 workerTaskStatistics.executing > 0
935 ) {
936 --workerTaskStatistics.executing
937 }
938 if (message.taskError == null) {
939 ++workerTaskStatistics.executed
940 } else {
941 ++workerTaskStatistics.failed
942 }
943 }
944
945 private updateRunTimeWorkerUsage (
946 workerUsage: WorkerUsage,
947 message: MessageValue<Response>
948 ): void {
949 if (message.taskError != null) {
950 return
951 }
952 updateMeasurementStatistics(
953 workerUsage.runTime,
954 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
955 message.taskPerformance?.runTime ?? 0
956 )
957 }
958
959 private updateWaitTimeWorkerUsage (
960 workerUsage: WorkerUsage,
961 task: Task<Data>
962 ): void {
963 const timestamp = performance.now()
964 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
965 updateMeasurementStatistics(
966 workerUsage.waitTime,
967 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
968 taskWaitTime
969 )
970 }
971
972 private updateEluWorkerUsage (
973 workerUsage: WorkerUsage,
974 message: MessageValue<Response>
975 ): void {
976 if (message.taskError != null) {
977 return
978 }
979 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
980 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
981 updateMeasurementStatistics(
982 workerUsage.elu.active,
983 eluTaskStatisticsRequirements,
984 message.taskPerformance?.elu?.active ?? 0
985 )
986 updateMeasurementStatistics(
987 workerUsage.elu.idle,
988 eluTaskStatisticsRequirements,
989 message.taskPerformance?.elu?.idle ?? 0
990 )
991 if (eluTaskStatisticsRequirements.aggregate) {
992 if (message.taskPerformance?.elu != null) {
993 if (workerUsage.elu.utilization != null) {
994 workerUsage.elu.utilization =
995 (workerUsage.elu.utilization +
996 message.taskPerformance.elu.utilization) /
997 2
998 } else {
999 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1000 }
1001 }
1002 }
1003 }
1004
1005 /**
1006 * Chooses a worker node for the next task.
1007 *
1008 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1009 *
1010 * @returns The chosen worker node key
1011 */
1012 private chooseWorkerNode (): number {
1013 if (this.shallCreateDynamicWorker()) {
1014 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1015 if (
1016 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1017 ) {
1018 return workerNodeKey
1019 }
1020 }
1021 return this.workerChoiceStrategyContext.execute()
1022 }
1023
1024 /**
1025 * Conditions for dynamic worker creation.
1026 *
1027 * @returns Whether to create a dynamic worker or not.
1028 */
1029 private shallCreateDynamicWorker (): boolean {
1030 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1031 }
1032
1033 /**
1034 * Sends a message to worker given its worker node key.
1035 *
1036 * @param workerNodeKey - The worker node key.
1037 * @param message - The message.
1038 * @param transferList - The optional array of transferable objects.
1039 */
1040 protected abstract sendToWorker (
1041 workerNodeKey: number,
1042 message: MessageValue<Data>,
1043 transferList?: TransferListItem[]
1044 ): void
1045
1046 /**
1047 * Creates a new worker.
1048 *
1049 * @returns Newly created worker.
1050 */
1051 protected abstract createWorker (): Worker
1052
1053 /**
1054 * Creates a new, completely set up worker node.
1055 *
1056 * @returns New, completely set up worker node key.
1057 */
1058 protected createAndSetupWorkerNode (): number {
1059 const worker = this.createWorker()
1060
1061 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1062 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1063 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1064 worker.on('error', (error) => {
1065 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1066 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1067 workerInfo.ready = false
1068 this.workerNodes[workerNodeKey].closeChannel()
1069 this.emitter?.emit(PoolEvents.error, error)
1070 if (
1071 this.opts.restartWorkerOnError === true &&
1072 !this.starting &&
1073 this.started
1074 ) {
1075 if (workerInfo.dynamic) {
1076 this.createAndSetupDynamicWorkerNode()
1077 } else {
1078 this.createAndSetupWorkerNode()
1079 }
1080 }
1081 if (this.opts.enableTasksQueue === true) {
1082 this.redistributeQueuedTasks(workerNodeKey)
1083 }
1084 })
1085 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1086 worker.once('exit', () => {
1087 this.removeWorkerNode(worker)
1088 })
1089
1090 const workerNodeKey = this.addWorkerNode(worker)
1091
1092 this.afterWorkerNodeSetup(workerNodeKey)
1093
1094 return workerNodeKey
1095 }
1096
1097 /**
1098 * Creates a new, completely set up dynamic worker node.
1099 *
1100 * @returns New, completely set up dynamic worker node key.
1101 */
1102 protected createAndSetupDynamicWorkerNode (): number {
1103 const workerNodeKey = this.createAndSetupWorkerNode()
1104 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1105 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1106 message.workerId
1107 )
1108 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1109 // Kill message received from worker
1110 if (
1111 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1112 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1113 ((this.opts.enableTasksQueue === false &&
1114 workerUsage.tasks.executing === 0) ||
1115 (this.opts.enableTasksQueue === true &&
1116 workerUsage.tasks.executing === 0 &&
1117 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1118 ) {
1119 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1120 this.emitter?.emit(PoolEvents.error, error)
1121 })
1122 }
1123 })
1124 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1125 this.sendToWorker(workerNodeKey, {
1126 checkActive: true,
1127 workerId: workerInfo.id as number
1128 })
1129 workerInfo.dynamic = true
1130 if (
1131 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1132 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1133 ) {
1134 workerInfo.ready = true
1135 }
1136 this.checkAndEmitDynamicWorkerCreationEvents()
1137 return workerNodeKey
1138 }
1139
1140 /**
1141 * Registers a listener callback on the worker given its worker node key.
1142 *
1143 * @param workerNodeKey - The worker node key.
1144 * @param listener - The message listener callback.
1145 */
1146 protected abstract registerWorkerMessageListener<
1147 Message extends Data | Response
1148 >(
1149 workerNodeKey: number,
1150 listener: (message: MessageValue<Message>) => void
1151 ): void
1152
1153 /**
1154 * Method hooked up after a worker node has been newly created.
1155 * Can be overridden.
1156 *
1157 * @param workerNodeKey - The newly created worker node key.
1158 */
1159 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1160 // Listen to worker messages.
1161 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1162 // Send the startup message to worker.
1163 this.sendStartupMessageToWorker(workerNodeKey)
1164 // Send the statistics message to worker.
1165 this.sendStatisticsMessageToWorker(workerNodeKey)
1166 if (this.opts.enableTasksQueue === true) {
1167 this.workerNodes[workerNodeKey].onEmptyQueue =
1168 this.taskStealingOnEmptyQueue.bind(this)
1169 this.workerNodes[workerNodeKey].onBackPressure =
1170 this.tasksStealingOnBackPressure.bind(this)
1171 }
1172 }
1173
1174 /**
1175 * Sends the startup message to worker given its worker node key.
1176 *
1177 * @param workerNodeKey - The worker node key.
1178 */
1179 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1180
1181 /**
1182 * Sends the statistics message to worker given its worker node key.
1183 *
1184 * @param workerNodeKey - The worker node key.
1185 */
1186 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1187 this.sendToWorker(workerNodeKey, {
1188 statistics: {
1189 runTime:
1190 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1191 .runTime.aggregate,
1192 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1193 .elu.aggregate
1194 },
1195 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1196 })
1197 }
1198
1199 private redistributeQueuedTasks (workerNodeKey: number): void {
1200 while (this.tasksQueueSize(workerNodeKey) > 0) {
1201 let destinationWorkerNodeKey!: number
1202 let minQueuedTasks = Infinity
1203 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1204 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1205 if (workerNode.usage.tasks.queued === 0) {
1206 destinationWorkerNodeKey = workerNodeId
1207 break
1208 }
1209 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1210 minQueuedTasks = workerNode.usage.tasks.queued
1211 destinationWorkerNodeKey = workerNodeId
1212 }
1213 }
1214 }
1215 if (destinationWorkerNodeKey != null) {
1216 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1217 const task = {
1218 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1219 workerId: destinationWorkerNode.info.id as number
1220 }
1221 if (
1222 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1223 destinationWorkerNode.usage.tasks.executing <
1224 (this.opts.tasksQueueOptions?.concurrency as number)
1225 ) {
1226 this.executeTask(destinationWorkerNodeKey, task)
1227 } else {
1228 this.enqueueTask(destinationWorkerNodeKey, task)
1229 }
1230 }
1231 }
1232 }
1233
1234 private taskStealingOnEmptyQueue (workerId: number): void {
1235 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1236 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1237 const workerNodes = this.workerNodes
1238 .slice()
1239 .sort(
1240 (workerNodeA, workerNodeB) =>
1241 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1242 )
1243 for (const sourceWorkerNode of workerNodes) {
1244 if (sourceWorkerNode.usage.tasks.queued === 0) {
1245 break
1246 }
1247 if (
1248 sourceWorkerNode.info.ready &&
1249 sourceWorkerNode.info.id !== workerId &&
1250 sourceWorkerNode.usage.tasks.queued > 0
1251 ) {
1252 const task = {
1253 ...(sourceWorkerNode.popTask() as Task<Data>),
1254 workerId: destinationWorkerNode.info.id as number
1255 }
1256 if (
1257 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1258 destinationWorkerNode.usage.tasks.executing <
1259 (this.opts.tasksQueueOptions?.concurrency as number)
1260 ) {
1261 this.executeTask(destinationWorkerNodeKey, task)
1262 } else {
1263 this.enqueueTask(destinationWorkerNodeKey, task)
1264 }
1265 break
1266 }
1267 }
1268 }
1269
1270 private tasksStealingOnBackPressure (workerId: number): void {
1271 const sourceWorkerNode =
1272 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1273 const workerNodes = this.workerNodes
1274 .slice()
1275 .sort(
1276 (workerNodeA, workerNodeB) =>
1277 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1278 )
1279 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1280 if (
1281 sourceWorkerNode.usage.tasks.queued > 0 &&
1282 workerNode.info.ready &&
1283 workerNode.info.id !== workerId &&
1284 workerNode.usage.tasks.queued <
1285 (this.opts.tasksQueueOptions?.size as number) - 1
1286 ) {
1287 const task = {
1288 ...(sourceWorkerNode.popTask() as Task<Data>),
1289 workerId: workerNode.info.id as number
1290 }
1291 if (
1292 this.tasksQueueSize(workerNodeKey) === 0 &&
1293 workerNode.usage.tasks.executing <
1294 (this.opts.tasksQueueOptions?.concurrency as number)
1295 ) {
1296 this.executeTask(workerNodeKey, task)
1297 } else {
1298 this.enqueueTask(workerNodeKey, task)
1299 }
1300 }
1301 }
1302 }
1303
1304 /**
1305 * This method is the listener registered for each worker message.
1306 *
1307 * @returns The listener function to execute when a message is received from a worker.
1308 */
1309 protected workerListener (): (message: MessageValue<Response>) => void {
1310 return (message) => {
1311 this.checkMessageWorkerId(message)
1312 if (message.ready != null && message.taskFunctions != null) {
1313 // Worker ready response received from worker
1314 this.handleWorkerReadyResponse(message)
1315 } else if (message.taskId != null) {
1316 // Task execution response received from worker
1317 this.handleTaskExecutionResponse(message)
1318 } else if (message.taskFunctions != null) {
1319 // Task functions message received from worker
1320 (
1321 this.getWorkerInfo(
1322 this.getWorkerNodeKeyByWorkerId(message.workerId)
1323 ) as WorkerInfo
1324 ).taskFunctions = message.taskFunctions
1325 }
1326 }
1327 }
1328
1329 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1330 if (message.ready === false) {
1331 throw new Error(`Worker ${message.workerId} failed to initialize`)
1332 }
1333 const workerInfo = this.getWorkerInfo(
1334 this.getWorkerNodeKeyByWorkerId(message.workerId)
1335 ) as WorkerInfo
1336 workerInfo.ready = message.ready as boolean
1337 workerInfo.taskFunctions = message.taskFunctions
1338 if (this.emitter != null && this.ready) {
1339 this.emitter.emit(PoolEvents.ready, this.info)
1340 }
1341 }
1342
1343 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1344 const { taskId, taskError, data } = message
1345 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1346 if (promiseResponse != null) {
1347 if (taskError != null) {
1348 this.emitter?.emit(PoolEvents.taskError, taskError)
1349 promiseResponse.reject(taskError.message)
1350 } else {
1351 promiseResponse.resolve(data as Response)
1352 }
1353 const workerNodeKey = promiseResponse.workerNodeKey
1354 this.afterTaskExecutionHook(workerNodeKey, message)
1355 this.promiseResponseMap.delete(taskId as string)
1356 if (
1357 this.opts.enableTasksQueue === true &&
1358 this.tasksQueueSize(workerNodeKey) > 0 &&
1359 this.workerNodes[workerNodeKey].usage.tasks.executing <
1360 (this.opts.tasksQueueOptions?.concurrency as number)
1361 ) {
1362 this.executeTask(
1363 workerNodeKey,
1364 this.dequeueTask(workerNodeKey) as Task<Data>
1365 )
1366 }
1367 this.workerChoiceStrategyContext.update(workerNodeKey)
1368 }
1369 }
1370
1371 private checkAndEmitTaskExecutionEvents (): void {
1372 if (this.busy) {
1373 this.emitter?.emit(PoolEvents.busy, this.info)
1374 }
1375 }
1376
1377 private checkAndEmitTaskQueuingEvents (): void {
1378 if (this.hasBackPressure()) {
1379 this.emitter?.emit(PoolEvents.backPressure, this.info)
1380 }
1381 }
1382
1383 private checkAndEmitDynamicWorkerCreationEvents (): void {
1384 if (this.type === PoolTypes.dynamic) {
1385 if (this.full) {
1386 this.emitter?.emit(PoolEvents.full, this.info)
1387 }
1388 }
1389 }
1390
1391 /**
1392 * Gets the worker information given its worker node key.
1393 *
1394 * @param workerNodeKey - The worker node key.
1395 * @returns The worker information.
1396 */
1397 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1398 return this.workerNodes[workerNodeKey]?.info
1399 }
1400
1401 /**
1402 * Adds the given worker in the pool worker nodes.
1403 *
1404 * @param worker - The worker.
1405 * @returns The added worker node key.
1406 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1407 */
1408 private addWorkerNode (worker: Worker): number {
1409 const workerNode = new WorkerNode<Worker, Data>(
1410 worker,
1411 this.worker,
1412 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1413 )
1414 // Flag the worker node as ready at pool startup.
1415 if (this.starting) {
1416 workerNode.info.ready = true
1417 }
1418 this.workerNodes.push(workerNode)
1419 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1420 if (workerNodeKey === -1) {
1421 throw new Error('Worker node added not found')
1422 }
1423 return workerNodeKey
1424 }
1425
1426 /**
1427 * Removes the given worker from the pool worker nodes.
1428 *
1429 * @param worker - The worker.
1430 */
1431 private removeWorkerNode (worker: Worker): void {
1432 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1433 if (workerNodeKey !== -1) {
1434 this.workerNodes.splice(workerNodeKey, 1)
1435 this.workerChoiceStrategyContext.remove(workerNodeKey)
1436 }
1437 }
1438
1439 /** @inheritDoc */
1440 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1441 return (
1442 this.opts.enableTasksQueue === true &&
1443 this.workerNodes[workerNodeKey].hasBackPressure()
1444 )
1445 }
1446
1447 private hasBackPressure (): boolean {
1448 return (
1449 this.opts.enableTasksQueue === true &&
1450 this.workerNodes.findIndex(
1451 (workerNode) => !workerNode.hasBackPressure()
1452 ) === -1
1453 )
1454 }
1455
1456 /**
1457 * Executes the given task on the worker given its worker node key.
1458 *
1459 * @param workerNodeKey - The worker node key.
1460 * @param task - The task to execute.
1461 */
1462 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1463 this.beforeTaskExecutionHook(workerNodeKey, task)
1464 this.sendToWorker(workerNodeKey, task, task.transferList)
1465 this.checkAndEmitTaskExecutionEvents()
1466 }
1467
1468 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1469 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1470 this.checkAndEmitTaskQueuingEvents()
1471 return tasksQueueSize
1472 }
1473
1474 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1475 return this.workerNodes[workerNodeKey].dequeueTask()
1476 }
1477
1478 private tasksQueueSize (workerNodeKey: number): number {
1479 return this.workerNodes[workerNodeKey].tasksQueueSize()
1480 }
1481
1482 protected flushTasksQueue (workerNodeKey: number): void {
1483 while (this.tasksQueueSize(workerNodeKey) > 0) {
1484 this.executeTask(
1485 workerNodeKey,
1486 this.dequeueTask(workerNodeKey) as Task<Data>
1487 )
1488 }
1489 this.workerNodes[workerNodeKey].clearTasksQueue()
1490 }
1491
1492 private flushTasksQueues (): void {
1493 for (const [workerNodeKey] of this.workerNodes.entries()) {
1494 this.flushTasksQueue(workerNodeKey)
1495 }
1496 }
1497 }