9aa6ec3dfeab90f8ba462920290f8a7b74b91f80
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import {
25 type IPool,
26 PoolEmitter,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51
52 /**
53 * Base class that implements some shared logic for all poolifier pools.
54 *
55 * @typeParam Worker - Type of worker which manages this pool.
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
58 */
59 export abstract class AbstractPool<
60 Worker extends IWorker,
61 Data = unknown,
62 Response = unknown
63 > implements IPool<Worker, Data, Response> {
64 /** @inheritDoc */
65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
66
67 /** @inheritDoc */
68 public readonly emitter?: PoolEmitter
69
70 /**
71 * The task execution response promise map:
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is started or not.
96 */
97 private started: boolean
98 /**
99 * Whether the pool is starting or not.
100 */
101 private starting: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.started = false
148 this.starting = false
149 if (this.opts.startWorkers === true) {
150 this.start()
151 }
152
153 this.startTimestamp = performance.now()
154 }
155
156 private checkFilePath (filePath: string): void {
157 if (
158 filePath == null ||
159 typeof filePath !== 'string' ||
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
162 throw new Error('Please specify a file with a worker implementation')
163 }
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
167 }
168
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
175 throw new TypeError(
176 'Cannot instantiate a pool with a non safe integer number of workers'
177 )
178 } else if (numberOfWorkers < 0) {
179 throw new RangeError(
180 'Cannot instantiate a pool with a negative number of workers'
181 )
182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
188 if (this.type === PoolTypes.dynamic) {
189 if (max == null) {
190 throw new TypeError(
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
201 } else if (max === 0) {
202 throw new RangeError(
203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
210 }
211 }
212
213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
214 if (isPlainObject(opts)) {
215 this.opts.startWorkers = opts.startWorkers ?? true
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
239 }
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
246 throw new Error(
247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
248 )
249 }
250 }
251
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: retries must be an integer'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
271 ) {
272 throw new RangeError(
273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.weights != null &&
278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
294 }
295
296 private checkValidTasksQueueOptions (
297 tasksQueueOptions: TasksQueueOptions
298 ): void {
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
302 if (
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
305 ) {
306 throw new TypeError(
307 'Invalid worker node tasks concurrency: must be an integer'
308 )
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
313 ) {
314 throw new RangeError(
315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
316 )
317 }
318 if (
319 tasksQueueOptions?.size != null &&
320 !Number.isSafeInteger(tasksQueueOptions?.size)
321 ) {
322 throw new TypeError(
323 'Invalid worker node tasks queue size: must be an integer'
324 )
325 }
326 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
327 throw new RangeError(
328 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
329 )
330 }
331 }
332
333 /** @inheritDoc */
334 public get info (): PoolInfo {
335 return {
336 version,
337 type: this.type,
338 worker: this.worker,
339 started: this.started,
340 ready: this.ready,
341 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
342 minSize: this.minSize,
343 maxSize: this.maxSize,
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate &&
346 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .waitTime.aggregate && { utilization: round(this.utilization) }),
348 workerNodes: this.workerNodes.length,
349 idleWorkerNodes: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 workerNode.usage.tasks.executing === 0
352 ? accumulator + 1
353 : accumulator,
354 0
355 ),
356 busyWorkerNodes: this.workerNodes.reduce(
357 (accumulator, workerNode) =>
358 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
359 0
360 ),
361 executedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.executed,
364 0
365 ),
366 executingTasks: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + workerNode.usage.tasks.executing,
369 0
370 ),
371 ...(this.opts.enableTasksQueue === true && {
372 queuedTasks: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + workerNode.usage.tasks.queued,
375 0
376 )
377 }),
378 ...(this.opts.enableTasksQueue === true && {
379 maxQueuedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
382 0
383 )
384 }),
385 ...(this.opts.enableTasksQueue === true && {
386 backPressure: this.hasBackPressure()
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 stolenTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + workerNode.usage.tasks.stolen,
392 0
393 )
394 }),
395 failedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + workerNode.usage.tasks.failed,
398 0
399 ),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .runTime.aggregate && {
402 runTime: {
403 minimum: round(
404 min(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
407 )
408 )
409 ),
410 maximum: round(
411 max(
412 ...this.workerNodes.map(
413 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
414 )
415 )
416 ),
417 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 .runTime.average && {
419 average: round(
420 average(
421 this.workerNodes.reduce<number[]>(
422 (accumulator, workerNode) =>
423 accumulator.concat(workerNode.usage.runTime.history),
424 []
425 )
426 )
427 )
428 }),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .runTime.median && {
431 median: round(
432 median(
433 this.workerNodes.reduce<number[]>(
434 (accumulator, workerNode) =>
435 accumulator.concat(workerNode.usage.runTime.history),
436 []
437 )
438 )
439 )
440 })
441 }
442 }),
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .waitTime.aggregate && {
445 waitTime: {
446 minimum: round(
447 min(
448 ...this.workerNodes.map(
449 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
450 )
451 )
452 ),
453 maximum: round(
454 max(
455 ...this.workerNodes.map(
456 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
457 )
458 )
459 ),
460 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 .waitTime.average && {
462 average: round(
463 average(
464 this.workerNodes.reduce<number[]>(
465 (accumulator, workerNode) =>
466 accumulator.concat(workerNode.usage.waitTime.history),
467 []
468 )
469 )
470 )
471 }),
472 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
473 .waitTime.median && {
474 median: round(
475 median(
476 this.workerNodes.reduce<number[]>(
477 (accumulator, workerNode) =>
478 accumulator.concat(workerNode.usage.waitTime.history),
479 []
480 )
481 )
482 )
483 })
484 }
485 })
486 }
487 }
488
489 /**
490 * The pool readiness boolean status.
491 */
492 private get ready (): boolean {
493 return (
494 this.workerNodes.reduce(
495 (accumulator, workerNode) =>
496 !workerNode.info.dynamic && workerNode.info.ready
497 ? accumulator + 1
498 : accumulator,
499 0
500 ) >= this.minSize
501 )
502 }
503
504 /**
505 * The approximate pool utilization.
506 *
507 * @returns The pool utilization.
508 */
509 private get utilization (): number {
510 const poolTimeCapacity =
511 (performance.now() - this.startTimestamp) * this.maxSize
512 const totalTasksRunTime = this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
515 0
516 )
517 const totalTasksWaitTime = this.workerNodes.reduce(
518 (accumulator, workerNode) =>
519 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
520 0
521 )
522 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
523 }
524
525 /**
526 * The pool type.
527 *
528 * If it is `'dynamic'`, it provides the `max` property.
529 */
530 protected abstract get type (): PoolType
531
532 /**
533 * The worker type.
534 */
535 protected abstract get worker (): WorkerType
536
537 /**
538 * The pool minimum size.
539 */
540 protected get minSize (): number {
541 return this.numberOfWorkers
542 }
543
544 /**
545 * The pool maximum size.
546 */
547 protected get maxSize (): number {
548 return this.max ?? this.numberOfWorkers
549 }
550
551 /**
552 * Checks if the worker id sent in the received message from a worker is valid.
553 *
554 * @param message - The received message.
555 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
556 */
557 private checkMessageWorkerId (message: MessageValue<Response>): void {
558 if (message.workerId == null) {
559 throw new Error('Worker message received without worker id')
560 } else if (
561 message.workerId != null &&
562 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
563 ) {
564 throw new Error(
565 `Worker message received from unknown worker '${message.workerId}'`
566 )
567 }
568 }
569
570 /**
571 * Gets the given worker its worker node key.
572 *
573 * @param worker - The worker.
574 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
575 */
576 private getWorkerNodeKeyByWorker (worker: Worker): number {
577 return this.workerNodes.findIndex(
578 workerNode => workerNode.worker === worker
579 )
580 }
581
582 /**
583 * Gets the worker node key given its worker id.
584 *
585 * @param workerId - The worker id.
586 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
587 */
588 private getWorkerNodeKeyByWorkerId (workerId: number): number {
589 return this.workerNodes.findIndex(
590 workerNode => workerNode.info.id === workerId
591 )
592 }
593
594 /** @inheritDoc */
595 public setWorkerChoiceStrategy (
596 workerChoiceStrategy: WorkerChoiceStrategy,
597 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
598 ): void {
599 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
600 this.opts.workerChoiceStrategy = workerChoiceStrategy
601 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
602 this.opts.workerChoiceStrategy
603 )
604 if (workerChoiceStrategyOptions != null) {
605 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
606 }
607 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
608 workerNode.resetUsage()
609 this.sendStatisticsMessageToWorker(workerNodeKey)
610 }
611 }
612
613 /** @inheritDoc */
614 public setWorkerChoiceStrategyOptions (
615 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
616 ): void {
617 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
618 this.opts.workerChoiceStrategyOptions = {
619 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
620 ...workerChoiceStrategyOptions
621 }
622 this.workerChoiceStrategyContext.setOptions(
623 this.opts.workerChoiceStrategyOptions
624 )
625 }
626
627 /** @inheritDoc */
628 public enableTasksQueue (
629 enable: boolean,
630 tasksQueueOptions?: TasksQueueOptions
631 ): void {
632 if (this.opts.enableTasksQueue === true && !enable) {
633 this.flushTasksQueues()
634 }
635 this.opts.enableTasksQueue = enable
636 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
637 }
638
639 /** @inheritDoc */
640 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
641 if (this.opts.enableTasksQueue === true) {
642 this.checkValidTasksQueueOptions(tasksQueueOptions)
643 this.opts.tasksQueueOptions =
644 this.buildTasksQueueOptions(tasksQueueOptions)
645 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
646 } else if (this.opts.tasksQueueOptions != null) {
647 delete this.opts.tasksQueueOptions
648 }
649 }
650
651 private setTasksQueueSize (size: number): void {
652 for (const workerNode of this.workerNodes) {
653 workerNode.tasksQueueBackPressureSize = size
654 }
655 }
656
657 private buildTasksQueueOptions (
658 tasksQueueOptions: TasksQueueOptions
659 ): TasksQueueOptions {
660 return {
661 ...{
662 size: Math.pow(this.maxSize, 2),
663 concurrency: 1,
664 taskStealing: true,
665 tasksStealingOnBackPressure: true
666 },
667 ...tasksQueueOptions
668 }
669 }
670
671 /**
672 * Whether the pool is full or not.
673 *
674 * The pool filling boolean status.
675 */
676 protected get full (): boolean {
677 return this.workerNodes.length >= this.maxSize
678 }
679
680 /**
681 * Whether the pool is busy or not.
682 *
683 * The pool busyness boolean status.
684 */
685 protected abstract get busy (): boolean
686
687 /**
688 * Whether worker nodes are executing concurrently their tasks quota or not.
689 *
690 * @returns Worker nodes busyness boolean status.
691 */
692 protected internalBusy (): boolean {
693 if (this.opts.enableTasksQueue === true) {
694 return (
695 this.workerNodes.findIndex(
696 workerNode =>
697 workerNode.info.ready &&
698 workerNode.usage.tasks.executing <
699 (this.opts.tasksQueueOptions?.concurrency as number)
700 ) === -1
701 )
702 }
703 return (
704 this.workerNodes.findIndex(
705 workerNode =>
706 workerNode.info.ready && workerNode.usage.tasks.executing === 0
707 ) === -1
708 )
709 }
710
711 /** @inheritDoc */
712 public listTaskFunctions (): string[] {
713 for (const workerNode of this.workerNodes) {
714 if (
715 Array.isArray(workerNode.info.taskFunctions) &&
716 workerNode.info.taskFunctions.length > 0
717 ) {
718 return workerNode.info.taskFunctions
719 }
720 }
721 return []
722 }
723
724 private shallExecuteTask (workerNodeKey: number): boolean {
725 return (
726 this.tasksQueueSize(workerNodeKey) === 0 &&
727 this.workerNodes[workerNodeKey].usage.tasks.executing <
728 (this.opts.tasksQueueOptions?.concurrency as number)
729 )
730 }
731
732 /** @inheritDoc */
733 public async execute (
734 data?: Data,
735 name?: string,
736 transferList?: TransferListItem[]
737 ): Promise<Response> {
738 return await new Promise<Response>((resolve, reject) => {
739 if (!this.started) {
740 reject(new Error('Cannot execute a task on not started pool'))
741 return
742 }
743 if (name != null && typeof name !== 'string') {
744 reject(new TypeError('name argument must be a string'))
745 return
746 }
747 if (
748 name != null &&
749 typeof name === 'string' &&
750 name.trim().length === 0
751 ) {
752 reject(new TypeError('name argument must not be an empty string'))
753 return
754 }
755 if (transferList != null && !Array.isArray(transferList)) {
756 reject(new TypeError('transferList argument must be an array'))
757 return
758 }
759 const timestamp = performance.now()
760 const workerNodeKey = this.chooseWorkerNode()
761 const task: Task<Data> = {
762 name: name ?? DEFAULT_TASK_NAME,
763 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
764 data: data ?? ({} as Data),
765 transferList,
766 timestamp,
767 workerId: this.getWorkerInfo(workerNodeKey).id as number,
768 taskId: randomUUID()
769 }
770 this.promiseResponseMap.set(task.taskId as string, {
771 resolve,
772 reject,
773 workerNodeKey
774 })
775 if (
776 this.opts.enableTasksQueue === false ||
777 (this.opts.enableTasksQueue === true &&
778 this.shallExecuteTask(workerNodeKey))
779 ) {
780 this.executeTask(workerNodeKey, task)
781 } else {
782 this.enqueueTask(workerNodeKey, task)
783 }
784 })
785 }
786
787 /** @inheritdoc */
788 public start (): void {
789 this.starting = true
790 while (
791 this.workerNodes.reduce(
792 (accumulator, workerNode) =>
793 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
794 0
795 ) < this.numberOfWorkers
796 ) {
797 this.createAndSetupWorkerNode()
798 }
799 this.starting = false
800 this.started = true
801 }
802
803 /** @inheritDoc */
804 public async destroy (): Promise<void> {
805 await Promise.all(
806 this.workerNodes.map(async (_, workerNodeKey) => {
807 await this.destroyWorkerNode(workerNodeKey)
808 })
809 )
810 this.emitter?.emit(PoolEvents.destroy, this.info)
811 this.started = false
812 }
813
814 protected async sendKillMessageToWorker (
815 workerNodeKey: number,
816 workerId: number
817 ): Promise<void> {
818 await new Promise<void>((resolve, reject) => {
819 this.registerWorkerMessageListener(workerNodeKey, message => {
820 if (message.kill === 'success') {
821 resolve()
822 } else if (message.kill === 'failure') {
823 reject(new Error(`Worker ${workerId} kill message handling failed`))
824 }
825 })
826 this.sendToWorker(workerNodeKey, { kill: true, workerId })
827 })
828 }
829
830 /**
831 * Terminates the worker node given its worker node key.
832 *
833 * @param workerNodeKey - The worker node key.
834 */
835 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
836
837 /**
838 * Setup hook to execute code before worker nodes are created in the abstract constructor.
839 * Can be overridden.
840 *
841 * @virtual
842 */
843 protected setupHook (): void {
844 /* Intentionally empty */
845 }
846
847 /**
848 * Should return whether the worker is the main worker or not.
849 */
850 protected abstract isMain (): boolean
851
852 /**
853 * Hook executed before the worker task execution.
854 * Can be overridden.
855 *
856 * @param workerNodeKey - The worker node key.
857 * @param task - The task to execute.
858 */
859 protected beforeTaskExecutionHook (
860 workerNodeKey: number,
861 task: Task<Data>
862 ): void {
863 if (this.workerNodes[workerNodeKey]?.usage != null) {
864 const workerUsage = this.workerNodes[workerNodeKey].usage
865 ++workerUsage.tasks.executing
866 this.updateWaitTimeWorkerUsage(workerUsage, task)
867 }
868 if (
869 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
870 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
871 task.name as string
872 ) != null
873 ) {
874 const taskFunctionWorkerUsage = this.workerNodes[
875 workerNodeKey
876 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
877 ++taskFunctionWorkerUsage.tasks.executing
878 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
879 }
880 }
881
882 /**
883 * Hook executed after the worker task execution.
884 * Can be overridden.
885 *
886 * @param workerNodeKey - The worker node key.
887 * @param message - The received message.
888 */
889 protected afterTaskExecutionHook (
890 workerNodeKey: number,
891 message: MessageValue<Response>
892 ): void {
893 if (this.workerNodes[workerNodeKey]?.usage != null) {
894 const workerUsage = this.workerNodes[workerNodeKey].usage
895 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
896 this.updateRunTimeWorkerUsage(workerUsage, message)
897 this.updateEluWorkerUsage(workerUsage, message)
898 }
899 if (
900 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
901 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
902 message.taskPerformance?.name as string
903 ) != null
904 ) {
905 const taskFunctionWorkerUsage = this.workerNodes[
906 workerNodeKey
907 ].getTaskFunctionWorkerUsage(
908 message.taskPerformance?.name as string
909 ) as WorkerUsage
910 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
911 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
912 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
913 }
914 }
915
916 /**
917 * Whether the worker node shall update its task function worker usage or not.
918 *
919 * @param workerNodeKey - The worker node key.
920 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
921 */
922 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
923 const workerInfo = this.getWorkerInfo(workerNodeKey)
924 return (
925 workerInfo != null &&
926 Array.isArray(workerInfo.taskFunctions) &&
927 workerInfo.taskFunctions.length > 2
928 )
929 }
930
931 private updateTaskStatisticsWorkerUsage (
932 workerUsage: WorkerUsage,
933 message: MessageValue<Response>
934 ): void {
935 const workerTaskStatistics = workerUsage.tasks
936 if (
937 workerTaskStatistics.executing != null &&
938 workerTaskStatistics.executing > 0
939 ) {
940 --workerTaskStatistics.executing
941 }
942 if (message.taskError == null) {
943 ++workerTaskStatistics.executed
944 } else {
945 ++workerTaskStatistics.failed
946 }
947 }
948
949 private updateRunTimeWorkerUsage (
950 workerUsage: WorkerUsage,
951 message: MessageValue<Response>
952 ): void {
953 if (message.taskError != null) {
954 return
955 }
956 updateMeasurementStatistics(
957 workerUsage.runTime,
958 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
959 message.taskPerformance?.runTime ?? 0
960 )
961 }
962
963 private updateWaitTimeWorkerUsage (
964 workerUsage: WorkerUsage,
965 task: Task<Data>
966 ): void {
967 const timestamp = performance.now()
968 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
969 updateMeasurementStatistics(
970 workerUsage.waitTime,
971 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
972 taskWaitTime
973 )
974 }
975
976 private updateEluWorkerUsage (
977 workerUsage: WorkerUsage,
978 message: MessageValue<Response>
979 ): void {
980 if (message.taskError != null) {
981 return
982 }
983 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
984 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
985 updateMeasurementStatistics(
986 workerUsage.elu.active,
987 eluTaskStatisticsRequirements,
988 message.taskPerformance?.elu?.active ?? 0
989 )
990 updateMeasurementStatistics(
991 workerUsage.elu.idle,
992 eluTaskStatisticsRequirements,
993 message.taskPerformance?.elu?.idle ?? 0
994 )
995 if (eluTaskStatisticsRequirements.aggregate) {
996 if (message.taskPerformance?.elu != null) {
997 if (workerUsage.elu.utilization != null) {
998 workerUsage.elu.utilization =
999 (workerUsage.elu.utilization +
1000 message.taskPerformance.elu.utilization) /
1001 2
1002 } else {
1003 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1004 }
1005 }
1006 }
1007 }
1008
1009 /**
1010 * Chooses a worker node for the next task.
1011 *
1012 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1013 *
1014 * @returns The chosen worker node key
1015 */
1016 private chooseWorkerNode (): number {
1017 if (this.shallCreateDynamicWorker()) {
1018 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1019 if (
1020 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1021 ) {
1022 return workerNodeKey
1023 }
1024 }
1025 return this.workerChoiceStrategyContext.execute()
1026 }
1027
1028 /**
1029 * Conditions for dynamic worker creation.
1030 *
1031 * @returns Whether to create a dynamic worker or not.
1032 */
1033 private shallCreateDynamicWorker (): boolean {
1034 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1035 }
1036
1037 /**
1038 * Sends a message to worker given its worker node key.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 * @param message - The message.
1042 * @param transferList - The optional array of transferable objects.
1043 */
1044 protected abstract sendToWorker (
1045 workerNodeKey: number,
1046 message: MessageValue<Data>,
1047 transferList?: TransferListItem[]
1048 ): void
1049
1050 /**
1051 * Creates a new worker.
1052 *
1053 * @returns Newly created worker.
1054 */
1055 protected abstract createWorker (): Worker
1056
1057 /**
1058 * Creates a new, completely set up worker node.
1059 *
1060 * @returns New, completely set up worker node key.
1061 */
1062 protected createAndSetupWorkerNode (): number {
1063 const worker = this.createWorker()
1064
1065 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1066 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1067 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1068 worker.on('error', error => {
1069 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1070 const workerInfo = this.getWorkerInfo(workerNodeKey)
1071 workerInfo.ready = false
1072 this.workerNodes[workerNodeKey].closeChannel()
1073 this.emitter?.emit(PoolEvents.error, error)
1074 if (
1075 this.opts.restartWorkerOnError === true &&
1076 this.started &&
1077 !this.starting
1078 ) {
1079 if (workerInfo.dynamic) {
1080 this.createAndSetupDynamicWorkerNode()
1081 } else {
1082 this.createAndSetupWorkerNode()
1083 }
1084 }
1085 if (this.opts.enableTasksQueue === true) {
1086 this.redistributeQueuedTasks(workerNodeKey)
1087 }
1088 })
1089 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1090 worker.once('exit', () => {
1091 this.removeWorkerNode(worker)
1092 })
1093
1094 const workerNodeKey = this.addWorkerNode(worker)
1095
1096 this.afterWorkerNodeSetup(workerNodeKey)
1097
1098 return workerNodeKey
1099 }
1100
1101 /**
1102 * Creates a new, completely set up dynamic worker node.
1103 *
1104 * @returns New, completely set up dynamic worker node key.
1105 */
1106 protected createAndSetupDynamicWorkerNode (): number {
1107 const workerNodeKey = this.createAndSetupWorkerNode()
1108 this.registerWorkerMessageListener(workerNodeKey, message => {
1109 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1110 message.workerId
1111 )
1112 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1113 // Kill message received from worker
1114 if (
1115 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1116 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1117 ((this.opts.enableTasksQueue === false &&
1118 workerUsage.tasks.executing === 0) ||
1119 (this.opts.enableTasksQueue === true &&
1120 workerUsage.tasks.executing === 0 &&
1121 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1122 ) {
1123 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1124 this.emitter?.emit(PoolEvents.error, error)
1125 })
1126 }
1127 })
1128 const workerInfo = this.getWorkerInfo(workerNodeKey)
1129 this.sendToWorker(workerNodeKey, {
1130 checkActive: true,
1131 workerId: workerInfo.id as number
1132 })
1133 workerInfo.dynamic = true
1134 if (
1135 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1136 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1137 ) {
1138 workerInfo.ready = true
1139 }
1140 this.checkAndEmitDynamicWorkerCreationEvents()
1141 return workerNodeKey
1142 }
1143
1144 /**
1145 * Registers a listener callback on the worker given its worker node key.
1146 *
1147 * @param workerNodeKey - The worker node key.
1148 * @param listener - The message listener callback.
1149 */
1150 protected abstract registerWorkerMessageListener<
1151 Message extends Data | Response
1152 >(
1153 workerNodeKey: number,
1154 listener: (message: MessageValue<Message>) => void
1155 ): void
1156
1157 /**
1158 * Method hooked up after a worker node has been newly created.
1159 * Can be overridden.
1160 *
1161 * @param workerNodeKey - The newly created worker node key.
1162 */
1163 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1164 // Listen to worker messages.
1165 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1166 // Send the startup message to worker.
1167 this.sendStartupMessageToWorker(workerNodeKey)
1168 // Send the statistics message to worker.
1169 this.sendStatisticsMessageToWorker(workerNodeKey)
1170 if (this.opts.enableTasksQueue === true) {
1171 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1172 this.workerNodes[workerNodeKey].onEmptyQueue =
1173 this.taskStealingOnEmptyQueue.bind(this)
1174 }
1175 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1176 this.workerNodes[workerNodeKey].onBackPressure =
1177 this.tasksStealingOnBackPressure.bind(this)
1178 }
1179 }
1180 }
1181
1182 /**
1183 * Sends the startup message to worker given its worker node key.
1184 *
1185 * @param workerNodeKey - The worker node key.
1186 */
1187 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1188
1189 /**
1190 * Sends the statistics message to worker given its worker node key.
1191 *
1192 * @param workerNodeKey - The worker node key.
1193 */
1194 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1195 this.sendToWorker(workerNodeKey, {
1196 statistics: {
1197 runTime:
1198 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1199 .runTime.aggregate,
1200 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1201 .elu.aggregate
1202 },
1203 workerId: this.getWorkerInfo(workerNodeKey).id as number
1204 })
1205 }
1206
1207 private redistributeQueuedTasks (workerNodeKey: number): void {
1208 while (this.tasksQueueSize(workerNodeKey) > 0) {
1209 const destinationWorkerNodeKey = this.workerNodes.reduce(
1210 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1211 return workerNode.info.ready &&
1212 workerNode.usage.tasks.queued <
1213 workerNodes[minWorkerNodeKey].usage.tasks.queued
1214 ? workerNodeKey
1215 : minWorkerNodeKey
1216 },
1217 0
1218 )
1219 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1220 const task = {
1221 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1222 workerId: destinationWorkerNode.info.id as number
1223 }
1224 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1225 this.executeTask(destinationWorkerNodeKey, task)
1226 } else {
1227 this.enqueueTask(destinationWorkerNodeKey, task)
1228 }
1229 }
1230 }
1231
1232 private updateTaskStolenStatisticsWorkerUsage (
1233 workerNodeKey: number,
1234 taskName: string
1235 ): void {
1236 const workerNode = this.workerNodes[workerNodeKey]
1237 if (workerNode?.usage != null) {
1238 ++workerNode.usage.tasks.stolen
1239 }
1240 if (
1241 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1242 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1243 ) {
1244 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1245 taskName
1246 ) as WorkerUsage
1247 ++taskFunctionWorkerUsage.tasks.stolen
1248 }
1249 }
1250
1251 private taskStealingOnEmptyQueue (workerId: number): void {
1252 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1253 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1254 const workerNodes = this.workerNodes
1255 .slice()
1256 .sort(
1257 (workerNodeA, workerNodeB) =>
1258 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1259 )
1260 const sourceWorkerNode = workerNodes.find(
1261 workerNode =>
1262 workerNode.info.ready &&
1263 workerNode.info.id !== workerId &&
1264 workerNode.usage.tasks.queued > 0
1265 )
1266 if (sourceWorkerNode != null) {
1267 const task = {
1268 ...(sourceWorkerNode.popTask() as Task<Data>),
1269 workerId: destinationWorkerNode.info.id as number
1270 }
1271 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1272 this.executeTask(destinationWorkerNodeKey, task)
1273 } else {
1274 this.enqueueTask(destinationWorkerNodeKey, task)
1275 }
1276 this.updateTaskStolenStatisticsWorkerUsage(
1277 destinationWorkerNodeKey,
1278 task.name as string
1279 )
1280 }
1281 }
1282
1283 private tasksStealingOnBackPressure (workerId: number): void {
1284 const sizeOffset = 1
1285 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1286 return
1287 }
1288 const sourceWorkerNode =
1289 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1290 const workerNodes = this.workerNodes
1291 .slice()
1292 .sort(
1293 (workerNodeA, workerNodeB) =>
1294 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1295 )
1296 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1297 if (
1298 sourceWorkerNode.usage.tasks.queued > 0 &&
1299 workerNode.info.ready &&
1300 workerNode.info.id !== workerId &&
1301 workerNode.usage.tasks.queued <
1302 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1303 ) {
1304 const task = {
1305 ...(sourceWorkerNode.popTask() as Task<Data>),
1306 workerId: workerNode.info.id as number
1307 }
1308 if (this.shallExecuteTask(workerNodeKey)) {
1309 this.executeTask(workerNodeKey, task)
1310 } else {
1311 this.enqueueTask(workerNodeKey, task)
1312 }
1313 this.updateTaskStolenStatisticsWorkerUsage(
1314 workerNodeKey,
1315 task.name as string
1316 )
1317 }
1318 }
1319 }
1320
1321 /**
1322 * This method is the listener registered for each worker message.
1323 *
1324 * @returns The listener function to execute when a message is received from a worker.
1325 */
1326 protected workerListener (): (message: MessageValue<Response>) => void {
1327 return message => {
1328 this.checkMessageWorkerId(message)
1329 if (message.ready != null && message.taskFunctions != null) {
1330 // Worker ready response received from worker
1331 this.handleWorkerReadyResponse(message)
1332 } else if (message.taskId != null) {
1333 // Task execution response received from worker
1334 this.handleTaskExecutionResponse(message)
1335 } else if (message.taskFunctions != null) {
1336 // Task functions message received from worker
1337 this.getWorkerInfo(
1338 this.getWorkerNodeKeyByWorkerId(message.workerId)
1339 ).taskFunctions = message.taskFunctions
1340 }
1341 }
1342 }
1343
1344 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1345 if (message.ready === false) {
1346 throw new Error(`Worker ${message.workerId} failed to initialize`)
1347 }
1348 const workerInfo = this.getWorkerInfo(
1349 this.getWorkerNodeKeyByWorkerId(message.workerId)
1350 )
1351 workerInfo.ready = message.ready as boolean
1352 workerInfo.taskFunctions = message.taskFunctions
1353 if (this.emitter != null && this.ready) {
1354 this.emitter.emit(PoolEvents.ready, this.info)
1355 }
1356 }
1357
1358 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1359 const { taskId, taskError, data } = message
1360 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1361 if (promiseResponse != null) {
1362 if (taskError != null) {
1363 this.emitter?.emit(PoolEvents.taskError, taskError)
1364 promiseResponse.reject(taskError.message)
1365 } else {
1366 promiseResponse.resolve(data as Response)
1367 }
1368 const workerNodeKey = promiseResponse.workerNodeKey
1369 this.afterTaskExecutionHook(workerNodeKey, message)
1370 this.workerChoiceStrategyContext.update(workerNodeKey)
1371 this.promiseResponseMap.delete(taskId as string)
1372 if (
1373 this.opts.enableTasksQueue === true &&
1374 this.tasksQueueSize(workerNodeKey) > 0 &&
1375 this.workerNodes[workerNodeKey].usage.tasks.executing <
1376 (this.opts.tasksQueueOptions?.concurrency as number)
1377 ) {
1378 this.executeTask(
1379 workerNodeKey,
1380 this.dequeueTask(workerNodeKey) as Task<Data>
1381 )
1382 }
1383 }
1384 }
1385
1386 private checkAndEmitTaskExecutionEvents (): void {
1387 if (this.busy) {
1388 this.emitter?.emit(PoolEvents.busy, this.info)
1389 }
1390 }
1391
1392 private checkAndEmitTaskQueuingEvents (): void {
1393 if (this.hasBackPressure()) {
1394 this.emitter?.emit(PoolEvents.backPressure, this.info)
1395 }
1396 }
1397
1398 private checkAndEmitDynamicWorkerCreationEvents (): void {
1399 if (this.type === PoolTypes.dynamic) {
1400 if (this.full) {
1401 this.emitter?.emit(PoolEvents.full, this.info)
1402 }
1403 }
1404 }
1405
1406 /**
1407 * Gets the worker information given its worker node key.
1408 *
1409 * @param workerNodeKey - The worker node key.
1410 * @returns The worker information.
1411 */
1412 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1413 return this.workerNodes[workerNodeKey].info
1414 }
1415
1416 /**
1417 * Adds the given worker in the pool worker nodes.
1418 *
1419 * @param worker - The worker.
1420 * @returns The added worker node key.
1421 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1422 */
1423 private addWorkerNode (worker: Worker): number {
1424 const workerNode = new WorkerNode<Worker, Data>(
1425 worker,
1426 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1427 )
1428 // Flag the worker node as ready at pool startup.
1429 if (this.starting) {
1430 workerNode.info.ready = true
1431 }
1432 this.workerNodes.push(workerNode)
1433 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1434 if (workerNodeKey === -1) {
1435 throw new Error('Worker added not found in worker nodes')
1436 }
1437 return workerNodeKey
1438 }
1439
1440 /**
1441 * Removes the given worker from the pool worker nodes.
1442 *
1443 * @param worker - The worker.
1444 */
1445 private removeWorkerNode (worker: Worker): void {
1446 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1447 if (workerNodeKey !== -1) {
1448 this.workerNodes.splice(workerNodeKey, 1)
1449 this.workerChoiceStrategyContext.remove(workerNodeKey)
1450 }
1451 }
1452
1453 /** @inheritDoc */
1454 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1455 return (
1456 this.opts.enableTasksQueue === true &&
1457 this.workerNodes[workerNodeKey].hasBackPressure()
1458 )
1459 }
1460
1461 private hasBackPressure (): boolean {
1462 return (
1463 this.opts.enableTasksQueue === true &&
1464 this.workerNodes.findIndex(
1465 workerNode => !workerNode.hasBackPressure()
1466 ) === -1
1467 )
1468 }
1469
1470 /**
1471 * Executes the given task on the worker given its worker node key.
1472 *
1473 * @param workerNodeKey - The worker node key.
1474 * @param task - The task to execute.
1475 */
1476 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1477 this.beforeTaskExecutionHook(workerNodeKey, task)
1478 this.sendToWorker(workerNodeKey, task, task.transferList)
1479 this.checkAndEmitTaskExecutionEvents()
1480 }
1481
1482 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1483 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1484 this.checkAndEmitTaskQueuingEvents()
1485 return tasksQueueSize
1486 }
1487
1488 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1489 return this.workerNodes[workerNodeKey].dequeueTask()
1490 }
1491
1492 private tasksQueueSize (workerNodeKey: number): number {
1493 return this.workerNodes[workerNodeKey].tasksQueueSize()
1494 }
1495
1496 protected flushTasksQueue (workerNodeKey: number): void {
1497 while (this.tasksQueueSize(workerNodeKey) > 0) {
1498 this.executeTask(
1499 workerNodeKey,
1500 this.dequeueTask(workerNodeKey) as Task<Data>
1501 )
1502 }
1503 this.workerNodes[workerNodeKey].clearTasksQueue()
1504 }
1505
1506 private flushTasksQueues (): void {
1507 for (const [workerNodeKey] of this.workerNodes.entries()) {
1508 this.flushTasksQueue(workerNodeKey)
1509 }
1510 }
1511 }