fix: fix task stealing related options handling at runtime
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import {
25 type IPool,
26 PoolEmitter,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51
52 /**
53 * Base class that implements some shared logic for all poolifier pools.
54 *
55 * @typeParam Worker - Type of worker which manages this pool.
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
58 */
59 export abstract class AbstractPool<
60 Worker extends IWorker,
61 Data = unknown,
62 Response = unknown
63 > implements IPool<Worker, Data, Response> {
64 /** @inheritDoc */
65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
66
67 /** @inheritDoc */
68 public readonly emitter?: PoolEmitter
69
70 /**
71 * The task execution response promise map:
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is started or not.
96 */
97 private started: boolean
98 /**
99 * Whether the pool is starting or not.
100 */
101 private starting: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.started = false
148 this.starting = false
149 if (this.opts.startWorkers === true) {
150 this.start()
151 }
152
153 this.startTimestamp = performance.now()
154 }
155
156 private checkFilePath (filePath: string): void {
157 if (
158 filePath == null ||
159 typeof filePath !== 'string' ||
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
162 throw new Error('Please specify a file with a worker implementation')
163 }
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
167 }
168
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
175 throw new TypeError(
176 'Cannot instantiate a pool with a non safe integer number of workers'
177 )
178 } else if (numberOfWorkers < 0) {
179 throw new RangeError(
180 'Cannot instantiate a pool with a negative number of workers'
181 )
182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
188 if (this.type === PoolTypes.dynamic) {
189 if (max == null) {
190 throw new TypeError(
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
201 } else if (max === 0) {
202 throw new RangeError(
203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
210 }
211 }
212
213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
214 if (isPlainObject(opts)) {
215 this.opts.startWorkers = opts.startWorkers ?? true
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
239 }
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
246 throw new Error(
247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
248 )
249 }
250 }
251
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: retries must be an integer'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
271 ) {
272 throw new RangeError(
273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.weights != null &&
278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
294 }
295
296 private checkValidTasksQueueOptions (
297 tasksQueueOptions: TasksQueueOptions
298 ): void {
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
302 if (
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
305 ) {
306 throw new TypeError(
307 'Invalid worker node tasks concurrency: must be an integer'
308 )
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
313 ) {
314 throw new RangeError(
315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
316 )
317 }
318 if (
319 tasksQueueOptions?.size != null &&
320 !Number.isSafeInteger(tasksQueueOptions?.size)
321 ) {
322 throw new TypeError(
323 'Invalid worker node tasks queue size: must be an integer'
324 )
325 }
326 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
327 throw new RangeError(
328 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
329 )
330 }
331 }
332
333 /** @inheritDoc */
334 public get info (): PoolInfo {
335 return {
336 version,
337 type: this.type,
338 worker: this.worker,
339 started: this.started,
340 ready: this.ready,
341 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
342 minSize: this.minSize,
343 maxSize: this.maxSize,
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate &&
346 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .waitTime.aggregate && { utilization: round(this.utilization) }),
348 workerNodes: this.workerNodes.length,
349 idleWorkerNodes: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 workerNode.usage.tasks.executing === 0
352 ? accumulator + 1
353 : accumulator,
354 0
355 ),
356 busyWorkerNodes: this.workerNodes.reduce(
357 (accumulator, workerNode) =>
358 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
359 0
360 ),
361 executedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.executed,
364 0
365 ),
366 executingTasks: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + workerNode.usage.tasks.executing,
369 0
370 ),
371 ...(this.opts.enableTasksQueue === true && {
372 queuedTasks: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + workerNode.usage.tasks.queued,
375 0
376 )
377 }),
378 ...(this.opts.enableTasksQueue === true && {
379 maxQueuedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
382 0
383 )
384 }),
385 ...(this.opts.enableTasksQueue === true && {
386 backPressure: this.hasBackPressure()
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 stolenTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + workerNode.usage.tasks.stolen,
392 0
393 )
394 }),
395 failedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + workerNode.usage.tasks.failed,
398 0
399 ),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .runTime.aggregate && {
402 runTime: {
403 minimum: round(
404 min(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
407 )
408 )
409 ),
410 maximum: round(
411 max(
412 ...this.workerNodes.map(
413 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
414 )
415 )
416 ),
417 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 .runTime.average && {
419 average: round(
420 average(
421 this.workerNodes.reduce<number[]>(
422 (accumulator, workerNode) =>
423 accumulator.concat(workerNode.usage.runTime.history),
424 []
425 )
426 )
427 )
428 }),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .runTime.median && {
431 median: round(
432 median(
433 this.workerNodes.reduce<number[]>(
434 (accumulator, workerNode) =>
435 accumulator.concat(workerNode.usage.runTime.history),
436 []
437 )
438 )
439 )
440 })
441 }
442 }),
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .waitTime.aggregate && {
445 waitTime: {
446 minimum: round(
447 min(
448 ...this.workerNodes.map(
449 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
450 )
451 )
452 ),
453 maximum: round(
454 max(
455 ...this.workerNodes.map(
456 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
457 )
458 )
459 ),
460 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
461 .waitTime.average && {
462 average: round(
463 average(
464 this.workerNodes.reduce<number[]>(
465 (accumulator, workerNode) =>
466 accumulator.concat(workerNode.usage.waitTime.history),
467 []
468 )
469 )
470 )
471 }),
472 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
473 .waitTime.median && {
474 median: round(
475 median(
476 this.workerNodes.reduce<number[]>(
477 (accumulator, workerNode) =>
478 accumulator.concat(workerNode.usage.waitTime.history),
479 []
480 )
481 )
482 )
483 })
484 }
485 })
486 }
487 }
488
489 /**
490 * The pool readiness boolean status.
491 */
492 private get ready (): boolean {
493 return (
494 this.workerNodes.reduce(
495 (accumulator, workerNode) =>
496 !workerNode.info.dynamic && workerNode.info.ready
497 ? accumulator + 1
498 : accumulator,
499 0
500 ) >= this.minSize
501 )
502 }
503
504 /**
505 * The approximate pool utilization.
506 *
507 * @returns The pool utilization.
508 */
509 private get utilization (): number {
510 const poolTimeCapacity =
511 (performance.now() - this.startTimestamp) * this.maxSize
512 const totalTasksRunTime = this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
515 0
516 )
517 const totalTasksWaitTime = this.workerNodes.reduce(
518 (accumulator, workerNode) =>
519 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
520 0
521 )
522 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
523 }
524
525 /**
526 * The pool type.
527 *
528 * If it is `'dynamic'`, it provides the `max` property.
529 */
530 protected abstract get type (): PoolType
531
532 /**
533 * The worker type.
534 */
535 protected abstract get worker (): WorkerType
536
537 /**
538 * The pool minimum size.
539 */
540 protected get minSize (): number {
541 return this.numberOfWorkers
542 }
543
544 /**
545 * The pool maximum size.
546 */
547 protected get maxSize (): number {
548 return this.max ?? this.numberOfWorkers
549 }
550
551 /**
552 * Checks if the worker id sent in the received message from a worker is valid.
553 *
554 * @param message - The received message.
555 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
556 */
557 private checkMessageWorkerId (message: MessageValue<Response>): void {
558 if (message.workerId == null) {
559 throw new Error('Worker message received without worker id')
560 } else if (
561 message.workerId != null &&
562 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
563 ) {
564 throw new Error(
565 `Worker message received from unknown worker '${message.workerId}'`
566 )
567 }
568 }
569
570 /**
571 * Gets the given worker its worker node key.
572 *
573 * @param worker - The worker.
574 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
575 */
576 private getWorkerNodeKeyByWorker (worker: Worker): number {
577 return this.workerNodes.findIndex(
578 workerNode => workerNode.worker === worker
579 )
580 }
581
582 /**
583 * Gets the worker node key given its worker id.
584 *
585 * @param workerId - The worker id.
586 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
587 */
588 private getWorkerNodeKeyByWorkerId (workerId: number): number {
589 return this.workerNodes.findIndex(
590 workerNode => workerNode.info.id === workerId
591 )
592 }
593
594 /** @inheritDoc */
595 public setWorkerChoiceStrategy (
596 workerChoiceStrategy: WorkerChoiceStrategy,
597 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
598 ): void {
599 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
600 this.opts.workerChoiceStrategy = workerChoiceStrategy
601 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
602 this.opts.workerChoiceStrategy
603 )
604 if (workerChoiceStrategyOptions != null) {
605 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
606 }
607 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
608 workerNode.resetUsage()
609 this.sendStatisticsMessageToWorker(workerNodeKey)
610 }
611 }
612
613 /** @inheritDoc */
614 public setWorkerChoiceStrategyOptions (
615 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
616 ): void {
617 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
618 this.opts.workerChoiceStrategyOptions = {
619 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
620 ...workerChoiceStrategyOptions
621 }
622 this.workerChoiceStrategyContext.setOptions(
623 this.opts.workerChoiceStrategyOptions
624 )
625 }
626
627 /** @inheritDoc */
628 public enableTasksQueue (
629 enable: boolean,
630 tasksQueueOptions?: TasksQueueOptions
631 ): void {
632 if (this.opts.enableTasksQueue === true && !enable) {
633 this.unsetTaskStealing()
634 this.unsetTasksStealingOnBackPressure()
635 this.flushTasksQueues()
636 }
637 this.opts.enableTasksQueue = enable
638 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
639 }
640
641 /** @inheritDoc */
642 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
643 if (this.opts.enableTasksQueue === true) {
644 this.checkValidTasksQueueOptions(tasksQueueOptions)
645 this.opts.tasksQueueOptions =
646 this.buildTasksQueueOptions(tasksQueueOptions)
647 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
648 if (this.opts.tasksQueueOptions.taskStealing === true) {
649 this.setTaskStealing()
650 } else {
651 this.unsetTaskStealing()
652 }
653 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
654 this.setTasksStealingOnBackPressure()
655 } else {
656 this.unsetTasksStealingOnBackPressure()
657 }
658 } else if (this.opts.tasksQueueOptions != null) {
659 delete this.opts.tasksQueueOptions
660 }
661 }
662
663 private setTasksQueueSize (size: number): void {
664 for (const workerNode of this.workerNodes) {
665 workerNode.tasksQueueBackPressureSize = size
666 }
667 }
668
669 private setTaskStealing (): void {
670 for (const [workerNodeKey] of this.workerNodes.entries()) {
671 this.workerNodes[workerNodeKey].onEmptyQueue =
672 this.taskStealingOnEmptyQueue.bind(this)
673 }
674 }
675
676 private unsetTaskStealing (): void {
677 for (const [workerNodeKey] of this.workerNodes.entries()) {
678 delete this.workerNodes[workerNodeKey].onEmptyQueue
679 }
680 }
681
682 private setTasksStealingOnBackPressure (): void {
683 for (const [workerNodeKey] of this.workerNodes.entries()) {
684 this.workerNodes[workerNodeKey].onBackPressure =
685 this.tasksStealingOnBackPressure.bind(this)
686 }
687 }
688
689 private unsetTasksStealingOnBackPressure (): void {
690 for (const [workerNodeKey] of this.workerNodes.entries()) {
691 delete this.workerNodes[workerNodeKey].onBackPressure
692 }
693 }
694
695 private buildTasksQueueOptions (
696 tasksQueueOptions: TasksQueueOptions
697 ): TasksQueueOptions {
698 return {
699 ...{
700 size: Math.pow(this.maxSize, 2),
701 concurrency: 1,
702 taskStealing: true,
703 tasksStealingOnBackPressure: true
704 },
705 ...tasksQueueOptions
706 }
707 }
708
709 /**
710 * Whether the pool is full or not.
711 *
712 * The pool filling boolean status.
713 */
714 protected get full (): boolean {
715 return this.workerNodes.length >= this.maxSize
716 }
717
718 /**
719 * Whether the pool is busy or not.
720 *
721 * The pool busyness boolean status.
722 */
723 protected abstract get busy (): boolean
724
725 /**
726 * Whether worker nodes are executing concurrently their tasks quota or not.
727 *
728 * @returns Worker nodes busyness boolean status.
729 */
730 protected internalBusy (): boolean {
731 if (this.opts.enableTasksQueue === true) {
732 return (
733 this.workerNodes.findIndex(
734 workerNode =>
735 workerNode.info.ready &&
736 workerNode.usage.tasks.executing <
737 (this.opts.tasksQueueOptions?.concurrency as number)
738 ) === -1
739 )
740 }
741 return (
742 this.workerNodes.findIndex(
743 workerNode =>
744 workerNode.info.ready && workerNode.usage.tasks.executing === 0
745 ) === -1
746 )
747 }
748
749 /** @inheritDoc */
750 public listTaskFunctions (): string[] {
751 for (const workerNode of this.workerNodes) {
752 if (
753 Array.isArray(workerNode.info.taskFunctions) &&
754 workerNode.info.taskFunctions.length > 0
755 ) {
756 return workerNode.info.taskFunctions
757 }
758 }
759 return []
760 }
761
762 private shallExecuteTask (workerNodeKey: number): boolean {
763 return (
764 this.tasksQueueSize(workerNodeKey) === 0 &&
765 this.workerNodes[workerNodeKey].usage.tasks.executing <
766 (this.opts.tasksQueueOptions?.concurrency as number)
767 )
768 }
769
770 /** @inheritDoc */
771 public async execute (
772 data?: Data,
773 name?: string,
774 transferList?: TransferListItem[]
775 ): Promise<Response> {
776 return await new Promise<Response>((resolve, reject) => {
777 if (!this.started) {
778 reject(new Error('Cannot execute a task on not started pool'))
779 return
780 }
781 if (name != null && typeof name !== 'string') {
782 reject(new TypeError('name argument must be a string'))
783 return
784 }
785 if (
786 name != null &&
787 typeof name === 'string' &&
788 name.trim().length === 0
789 ) {
790 reject(new TypeError('name argument must not be an empty string'))
791 return
792 }
793 if (transferList != null && !Array.isArray(transferList)) {
794 reject(new TypeError('transferList argument must be an array'))
795 return
796 }
797 const timestamp = performance.now()
798 const workerNodeKey = this.chooseWorkerNode()
799 const task: Task<Data> = {
800 name: name ?? DEFAULT_TASK_NAME,
801 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
802 data: data ?? ({} as Data),
803 transferList,
804 timestamp,
805 workerId: this.getWorkerInfo(workerNodeKey).id as number,
806 taskId: randomUUID()
807 }
808 this.promiseResponseMap.set(task.taskId as string, {
809 resolve,
810 reject,
811 workerNodeKey
812 })
813 if (
814 this.opts.enableTasksQueue === false ||
815 (this.opts.enableTasksQueue === true &&
816 this.shallExecuteTask(workerNodeKey))
817 ) {
818 this.executeTask(workerNodeKey, task)
819 } else {
820 this.enqueueTask(workerNodeKey, task)
821 }
822 })
823 }
824
825 /** @inheritdoc */
826 public start (): void {
827 this.starting = true
828 while (
829 this.workerNodes.reduce(
830 (accumulator, workerNode) =>
831 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
832 0
833 ) < this.numberOfWorkers
834 ) {
835 this.createAndSetupWorkerNode()
836 }
837 this.starting = false
838 this.started = true
839 }
840
841 /** @inheritDoc */
842 public async destroy (): Promise<void> {
843 await Promise.all(
844 this.workerNodes.map(async (_, workerNodeKey) => {
845 await this.destroyWorkerNode(workerNodeKey)
846 })
847 )
848 this.emitter?.emit(PoolEvents.destroy, this.info)
849 this.started = false
850 }
851
852 protected async sendKillMessageToWorker (
853 workerNodeKey: number,
854 workerId: number
855 ): Promise<void> {
856 await new Promise<void>((resolve, reject) => {
857 this.registerWorkerMessageListener(workerNodeKey, message => {
858 if (message.kill === 'success') {
859 resolve()
860 } else if (message.kill === 'failure') {
861 reject(new Error(`Worker ${workerId} kill message handling failed`))
862 }
863 })
864 this.sendToWorker(workerNodeKey, { kill: true, workerId })
865 })
866 }
867
868 /**
869 * Terminates the worker node given its worker node key.
870 *
871 * @param workerNodeKey - The worker node key.
872 */
873 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
874
875 /**
876 * Setup hook to execute code before worker nodes are created in the abstract constructor.
877 * Can be overridden.
878 *
879 * @virtual
880 */
881 protected setupHook (): void {
882 /* Intentionally empty */
883 }
884
885 /**
886 * Should return whether the worker is the main worker or not.
887 */
888 protected abstract isMain (): boolean
889
890 /**
891 * Hook executed before the worker task execution.
892 * Can be overridden.
893 *
894 * @param workerNodeKey - The worker node key.
895 * @param task - The task to execute.
896 */
897 protected beforeTaskExecutionHook (
898 workerNodeKey: number,
899 task: Task<Data>
900 ): void {
901 if (this.workerNodes[workerNodeKey]?.usage != null) {
902 const workerUsage = this.workerNodes[workerNodeKey].usage
903 ++workerUsage.tasks.executing
904 this.updateWaitTimeWorkerUsage(workerUsage, task)
905 }
906 if (
907 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
908 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
909 task.name as string
910 ) != null
911 ) {
912 const taskFunctionWorkerUsage = this.workerNodes[
913 workerNodeKey
914 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
915 ++taskFunctionWorkerUsage.tasks.executing
916 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
917 }
918 }
919
920 /**
921 * Hook executed after the worker task execution.
922 * Can be overridden.
923 *
924 * @param workerNodeKey - The worker node key.
925 * @param message - The received message.
926 */
927 protected afterTaskExecutionHook (
928 workerNodeKey: number,
929 message: MessageValue<Response>
930 ): void {
931 if (this.workerNodes[workerNodeKey]?.usage != null) {
932 const workerUsage = this.workerNodes[workerNodeKey].usage
933 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
934 this.updateRunTimeWorkerUsage(workerUsage, message)
935 this.updateEluWorkerUsage(workerUsage, message)
936 }
937 if (
938 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
939 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
940 message.taskPerformance?.name as string
941 ) != null
942 ) {
943 const taskFunctionWorkerUsage = this.workerNodes[
944 workerNodeKey
945 ].getTaskFunctionWorkerUsage(
946 message.taskPerformance?.name as string
947 ) as WorkerUsage
948 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
949 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
950 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
951 }
952 }
953
954 /**
955 * Whether the worker node shall update its task function worker usage or not.
956 *
957 * @param workerNodeKey - The worker node key.
958 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
959 */
960 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
961 const workerInfo = this.getWorkerInfo(workerNodeKey)
962 return (
963 workerInfo != null &&
964 Array.isArray(workerInfo.taskFunctions) &&
965 workerInfo.taskFunctions.length > 2
966 )
967 }
968
969 private updateTaskStatisticsWorkerUsage (
970 workerUsage: WorkerUsage,
971 message: MessageValue<Response>
972 ): void {
973 const workerTaskStatistics = workerUsage.tasks
974 if (
975 workerTaskStatistics.executing != null &&
976 workerTaskStatistics.executing > 0
977 ) {
978 --workerTaskStatistics.executing
979 }
980 if (message.taskError == null) {
981 ++workerTaskStatistics.executed
982 } else {
983 ++workerTaskStatistics.failed
984 }
985 }
986
987 private updateRunTimeWorkerUsage (
988 workerUsage: WorkerUsage,
989 message: MessageValue<Response>
990 ): void {
991 if (message.taskError != null) {
992 return
993 }
994 updateMeasurementStatistics(
995 workerUsage.runTime,
996 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
997 message.taskPerformance?.runTime ?? 0
998 )
999 }
1000
1001 private updateWaitTimeWorkerUsage (
1002 workerUsage: WorkerUsage,
1003 task: Task<Data>
1004 ): void {
1005 const timestamp = performance.now()
1006 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1007 updateMeasurementStatistics(
1008 workerUsage.waitTime,
1009 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1010 taskWaitTime
1011 )
1012 }
1013
1014 private updateEluWorkerUsage (
1015 workerUsage: WorkerUsage,
1016 message: MessageValue<Response>
1017 ): void {
1018 if (message.taskError != null) {
1019 return
1020 }
1021 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1022 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1023 updateMeasurementStatistics(
1024 workerUsage.elu.active,
1025 eluTaskStatisticsRequirements,
1026 message.taskPerformance?.elu?.active ?? 0
1027 )
1028 updateMeasurementStatistics(
1029 workerUsage.elu.idle,
1030 eluTaskStatisticsRequirements,
1031 message.taskPerformance?.elu?.idle ?? 0
1032 )
1033 if (eluTaskStatisticsRequirements.aggregate) {
1034 if (message.taskPerformance?.elu != null) {
1035 if (workerUsage.elu.utilization != null) {
1036 workerUsage.elu.utilization =
1037 (workerUsage.elu.utilization +
1038 message.taskPerformance.elu.utilization) /
1039 2
1040 } else {
1041 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1042 }
1043 }
1044 }
1045 }
1046
1047 /**
1048 * Chooses a worker node for the next task.
1049 *
1050 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1051 *
1052 * @returns The chosen worker node key
1053 */
1054 private chooseWorkerNode (): number {
1055 if (this.shallCreateDynamicWorker()) {
1056 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1057 if (
1058 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1059 ) {
1060 return workerNodeKey
1061 }
1062 }
1063 return this.workerChoiceStrategyContext.execute()
1064 }
1065
1066 /**
1067 * Conditions for dynamic worker creation.
1068 *
1069 * @returns Whether to create a dynamic worker or not.
1070 */
1071 private shallCreateDynamicWorker (): boolean {
1072 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1073 }
1074
1075 /**
1076 * Sends a message to worker given its worker node key.
1077 *
1078 * @param workerNodeKey - The worker node key.
1079 * @param message - The message.
1080 * @param transferList - The optional array of transferable objects.
1081 */
1082 protected abstract sendToWorker (
1083 workerNodeKey: number,
1084 message: MessageValue<Data>,
1085 transferList?: TransferListItem[]
1086 ): void
1087
1088 /**
1089 * Creates a new worker.
1090 *
1091 * @returns Newly created worker.
1092 */
1093 protected abstract createWorker (): Worker
1094
1095 /**
1096 * Creates a new, completely set up worker node.
1097 *
1098 * @returns New, completely set up worker node key.
1099 */
1100 protected createAndSetupWorkerNode (): number {
1101 const worker = this.createWorker()
1102
1103 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1104 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1105 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1106 worker.on('error', error => {
1107 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1108 const workerInfo = this.getWorkerInfo(workerNodeKey)
1109 workerInfo.ready = false
1110 this.workerNodes[workerNodeKey].closeChannel()
1111 this.emitter?.emit(PoolEvents.error, error)
1112 if (
1113 this.opts.restartWorkerOnError === true &&
1114 this.started &&
1115 !this.starting
1116 ) {
1117 if (workerInfo.dynamic) {
1118 this.createAndSetupDynamicWorkerNode()
1119 } else {
1120 this.createAndSetupWorkerNode()
1121 }
1122 }
1123 if (this.opts.enableTasksQueue === true) {
1124 this.redistributeQueuedTasks(workerNodeKey)
1125 }
1126 })
1127 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1128 worker.once('exit', () => {
1129 this.removeWorkerNode(worker)
1130 })
1131
1132 const workerNodeKey = this.addWorkerNode(worker)
1133
1134 this.afterWorkerNodeSetup(workerNodeKey)
1135
1136 return workerNodeKey
1137 }
1138
1139 /**
1140 * Creates a new, completely set up dynamic worker node.
1141 *
1142 * @returns New, completely set up dynamic worker node key.
1143 */
1144 protected createAndSetupDynamicWorkerNode (): number {
1145 const workerNodeKey = this.createAndSetupWorkerNode()
1146 this.registerWorkerMessageListener(workerNodeKey, message => {
1147 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1148 message.workerId
1149 )
1150 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1151 // Kill message received from worker
1152 if (
1153 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1154 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1155 ((this.opts.enableTasksQueue === false &&
1156 workerUsage.tasks.executing === 0) ||
1157 (this.opts.enableTasksQueue === true &&
1158 workerUsage.tasks.executing === 0 &&
1159 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1160 ) {
1161 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1162 this.emitter?.emit(PoolEvents.error, error)
1163 })
1164 }
1165 })
1166 const workerInfo = this.getWorkerInfo(workerNodeKey)
1167 this.sendToWorker(workerNodeKey, {
1168 checkActive: true,
1169 workerId: workerInfo.id as number
1170 })
1171 workerInfo.dynamic = true
1172 if (
1173 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1174 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1175 ) {
1176 workerInfo.ready = true
1177 }
1178 this.checkAndEmitDynamicWorkerCreationEvents()
1179 return workerNodeKey
1180 }
1181
1182 /**
1183 * Registers a listener callback on the worker given its worker node key.
1184 *
1185 * @param workerNodeKey - The worker node key.
1186 * @param listener - The message listener callback.
1187 */
1188 protected abstract registerWorkerMessageListener<
1189 Message extends Data | Response
1190 >(
1191 workerNodeKey: number,
1192 listener: (message: MessageValue<Message>) => void
1193 ): void
1194
1195 /**
1196 * Method hooked up after a worker node has been newly created.
1197 * Can be overridden.
1198 *
1199 * @param workerNodeKey - The newly created worker node key.
1200 */
1201 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1202 // Listen to worker messages.
1203 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1204 // Send the startup message to worker.
1205 this.sendStartupMessageToWorker(workerNodeKey)
1206 // Send the statistics message to worker.
1207 this.sendStatisticsMessageToWorker(workerNodeKey)
1208 if (this.opts.enableTasksQueue === true) {
1209 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1210 this.workerNodes[workerNodeKey].onEmptyQueue =
1211 this.taskStealingOnEmptyQueue.bind(this)
1212 }
1213 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1214 this.workerNodes[workerNodeKey].onBackPressure =
1215 this.tasksStealingOnBackPressure.bind(this)
1216 }
1217 }
1218 }
1219
1220 /**
1221 * Sends the startup message to worker given its worker node key.
1222 *
1223 * @param workerNodeKey - The worker node key.
1224 */
1225 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1226
1227 /**
1228 * Sends the statistics message to worker given its worker node key.
1229 *
1230 * @param workerNodeKey - The worker node key.
1231 */
1232 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1233 this.sendToWorker(workerNodeKey, {
1234 statistics: {
1235 runTime:
1236 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1237 .runTime.aggregate,
1238 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1239 .elu.aggregate
1240 },
1241 workerId: this.getWorkerInfo(workerNodeKey).id as number
1242 })
1243 }
1244
1245 private redistributeQueuedTasks (workerNodeKey: number): void {
1246 while (this.tasksQueueSize(workerNodeKey) > 0) {
1247 const destinationWorkerNodeKey = this.workerNodes.reduce(
1248 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1249 return workerNode.info.ready &&
1250 workerNode.usage.tasks.queued <
1251 workerNodes[minWorkerNodeKey].usage.tasks.queued
1252 ? workerNodeKey
1253 : minWorkerNodeKey
1254 },
1255 0
1256 )
1257 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1258 const task = {
1259 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1260 workerId: destinationWorkerNode.info.id as number
1261 }
1262 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1263 this.executeTask(destinationWorkerNodeKey, task)
1264 } else {
1265 this.enqueueTask(destinationWorkerNodeKey, task)
1266 }
1267 }
1268 }
1269
1270 private updateTaskStolenStatisticsWorkerUsage (
1271 workerNodeKey: number,
1272 taskName: string
1273 ): void {
1274 const workerNode = this.workerNodes[workerNodeKey]
1275 if (workerNode?.usage != null) {
1276 ++workerNode.usage.tasks.stolen
1277 }
1278 if (
1279 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1280 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1281 ) {
1282 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1283 taskName
1284 ) as WorkerUsage
1285 ++taskFunctionWorkerUsage.tasks.stolen
1286 }
1287 }
1288
1289 private taskStealingOnEmptyQueue (workerId: number): void {
1290 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1291 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1292 const workerNodes = this.workerNodes
1293 .slice()
1294 .sort(
1295 (workerNodeA, workerNodeB) =>
1296 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1297 )
1298 const sourceWorkerNode = workerNodes.find(
1299 workerNode =>
1300 workerNode.info.ready &&
1301 workerNode.info.id !== workerId &&
1302 workerNode.usage.tasks.queued > 0
1303 )
1304 if (sourceWorkerNode != null) {
1305 const task = {
1306 ...(sourceWorkerNode.popTask() as Task<Data>),
1307 workerId: destinationWorkerNode.info.id as number
1308 }
1309 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1310 this.executeTask(destinationWorkerNodeKey, task)
1311 } else {
1312 this.enqueueTask(destinationWorkerNodeKey, task)
1313 }
1314 this.updateTaskStolenStatisticsWorkerUsage(
1315 destinationWorkerNodeKey,
1316 task.name as string
1317 )
1318 }
1319 }
1320
1321 private tasksStealingOnBackPressure (workerId: number): void {
1322 const sizeOffset = 1
1323 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1324 return
1325 }
1326 const sourceWorkerNode =
1327 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1328 const workerNodes = this.workerNodes
1329 .slice()
1330 .sort(
1331 (workerNodeA, workerNodeB) =>
1332 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1333 )
1334 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1335 if (
1336 sourceWorkerNode.usage.tasks.queued > 0 &&
1337 workerNode.info.ready &&
1338 workerNode.info.id !== workerId &&
1339 workerNode.usage.tasks.queued <
1340 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1341 ) {
1342 const task = {
1343 ...(sourceWorkerNode.popTask() as Task<Data>),
1344 workerId: workerNode.info.id as number
1345 }
1346 if (this.shallExecuteTask(workerNodeKey)) {
1347 this.executeTask(workerNodeKey, task)
1348 } else {
1349 this.enqueueTask(workerNodeKey, task)
1350 }
1351 this.updateTaskStolenStatisticsWorkerUsage(
1352 workerNodeKey,
1353 task.name as string
1354 )
1355 }
1356 }
1357 }
1358
1359 /**
1360 * This method is the listener registered for each worker message.
1361 *
1362 * @returns The listener function to execute when a message is received from a worker.
1363 */
1364 protected workerListener (): (message: MessageValue<Response>) => void {
1365 return message => {
1366 this.checkMessageWorkerId(message)
1367 if (message.ready != null && message.taskFunctions != null) {
1368 // Worker ready response received from worker
1369 this.handleWorkerReadyResponse(message)
1370 } else if (message.taskId != null) {
1371 // Task execution response received from worker
1372 this.handleTaskExecutionResponse(message)
1373 } else if (message.taskFunctions != null) {
1374 // Task functions message received from worker
1375 this.getWorkerInfo(
1376 this.getWorkerNodeKeyByWorkerId(message.workerId)
1377 ).taskFunctions = message.taskFunctions
1378 }
1379 }
1380 }
1381
1382 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1383 if (message.ready === false) {
1384 throw new Error(`Worker ${message.workerId} failed to initialize`)
1385 }
1386 const workerInfo = this.getWorkerInfo(
1387 this.getWorkerNodeKeyByWorkerId(message.workerId)
1388 )
1389 workerInfo.ready = message.ready as boolean
1390 workerInfo.taskFunctions = message.taskFunctions
1391 if (this.emitter != null && this.ready) {
1392 this.emitter.emit(PoolEvents.ready, this.info)
1393 }
1394 }
1395
1396 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1397 const { taskId, taskError, data } = message
1398 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1399 if (promiseResponse != null) {
1400 if (taskError != null) {
1401 this.emitter?.emit(PoolEvents.taskError, taskError)
1402 promiseResponse.reject(taskError.message)
1403 } else {
1404 promiseResponse.resolve(data as Response)
1405 }
1406 const workerNodeKey = promiseResponse.workerNodeKey
1407 this.afterTaskExecutionHook(workerNodeKey, message)
1408 this.workerChoiceStrategyContext.update(workerNodeKey)
1409 this.promiseResponseMap.delete(taskId as string)
1410 if (
1411 this.opts.enableTasksQueue === true &&
1412 this.tasksQueueSize(workerNodeKey) > 0 &&
1413 this.workerNodes[workerNodeKey].usage.tasks.executing <
1414 (this.opts.tasksQueueOptions?.concurrency as number)
1415 ) {
1416 this.executeTask(
1417 workerNodeKey,
1418 this.dequeueTask(workerNodeKey) as Task<Data>
1419 )
1420 }
1421 }
1422 }
1423
1424 private checkAndEmitTaskExecutionEvents (): void {
1425 if (this.busy) {
1426 this.emitter?.emit(PoolEvents.busy, this.info)
1427 }
1428 }
1429
1430 private checkAndEmitTaskQueuingEvents (): void {
1431 if (this.hasBackPressure()) {
1432 this.emitter?.emit(PoolEvents.backPressure, this.info)
1433 }
1434 }
1435
1436 private checkAndEmitDynamicWorkerCreationEvents (): void {
1437 if (this.type === PoolTypes.dynamic) {
1438 if (this.full) {
1439 this.emitter?.emit(PoolEvents.full, this.info)
1440 }
1441 }
1442 }
1443
1444 /**
1445 * Gets the worker information given its worker node key.
1446 *
1447 * @param workerNodeKey - The worker node key.
1448 * @returns The worker information.
1449 */
1450 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1451 return this.workerNodes[workerNodeKey].info
1452 }
1453
1454 /**
1455 * Adds the given worker in the pool worker nodes.
1456 *
1457 * @param worker - The worker.
1458 * @returns The added worker node key.
1459 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1460 */
1461 private addWorkerNode (worker: Worker): number {
1462 const workerNode = new WorkerNode<Worker, Data>(
1463 worker,
1464 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1465 )
1466 // Flag the worker node as ready at pool startup.
1467 if (this.starting) {
1468 workerNode.info.ready = true
1469 }
1470 this.workerNodes.push(workerNode)
1471 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1472 if (workerNodeKey === -1) {
1473 throw new Error('Worker added not found in worker nodes')
1474 }
1475 return workerNodeKey
1476 }
1477
1478 /**
1479 * Removes the given worker from the pool worker nodes.
1480 *
1481 * @param worker - The worker.
1482 */
1483 private removeWorkerNode (worker: Worker): void {
1484 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1485 if (workerNodeKey !== -1) {
1486 this.workerNodes.splice(workerNodeKey, 1)
1487 this.workerChoiceStrategyContext.remove(workerNodeKey)
1488 }
1489 }
1490
1491 /** @inheritDoc */
1492 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1493 return (
1494 this.opts.enableTasksQueue === true &&
1495 this.workerNodes[workerNodeKey].hasBackPressure()
1496 )
1497 }
1498
1499 private hasBackPressure (): boolean {
1500 return (
1501 this.opts.enableTasksQueue === true &&
1502 this.workerNodes.findIndex(
1503 workerNode => !workerNode.hasBackPressure()
1504 ) === -1
1505 )
1506 }
1507
1508 /**
1509 * Executes the given task on the worker given its worker node key.
1510 *
1511 * @param workerNodeKey - The worker node key.
1512 * @param task - The task to execute.
1513 */
1514 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1515 this.beforeTaskExecutionHook(workerNodeKey, task)
1516 this.sendToWorker(workerNodeKey, task, task.transferList)
1517 this.checkAndEmitTaskExecutionEvents()
1518 }
1519
1520 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1521 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1522 this.checkAndEmitTaskQueuingEvents()
1523 return tasksQueueSize
1524 }
1525
1526 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1527 return this.workerNodes[workerNodeKey].dequeueTask()
1528 }
1529
1530 private tasksQueueSize (workerNodeKey: number): number {
1531 return this.workerNodes[workerNodeKey].tasksQueueSize()
1532 }
1533
1534 protected flushTasksQueue (workerNodeKey: number): void {
1535 while (this.tasksQueueSize(workerNodeKey) > 0) {
1536 this.executeTask(
1537 workerNodeKey,
1538 this.dequeueTask(workerNodeKey) as Task<Data>
1539 )
1540 }
1541 this.workerNodes[workerNodeKey].clearTasksQueue()
1542 }
1543
1544 private flushTasksQueues (): void {
1545 for (const [workerNodeKey] of this.workerNodes.entries()) {
1546 this.flushTasksQueue(workerNodeKey)
1547 }
1548 }
1549 }