refactor: use pool side task functions list for hasTaskFunction()
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import type { TaskFunction } from '../worker/task-functions'
25 import {
26 type IPool,
27 PoolEmitter,
28 PoolEvents,
29 type PoolInfo,
30 type PoolOptions,
31 type PoolType,
32 PoolTypes,
33 type TasksQueueOptions
34 } from './pool'
35 import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
41 } from './worker'
42 import {
43 type MeasurementStatisticsRequirements,
44 Measurements,
45 WorkerChoiceStrategies,
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
48 } from './selection-strategies/selection-strategies-types'
49 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
50 import { version } from './version'
51 import { WorkerNode } from './worker-node'
52
53 /**
54 * Base class that implements some shared logic for all poolifier pools.
55 *
56 * @typeParam Worker - Type of worker which manages this pool.
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
59 */
60 export abstract class AbstractPool<
61 Worker extends IWorker,
62 Data = unknown,
63 Response = unknown
64 > implements IPool<Worker, Data, Response> {
65 /** @inheritDoc */
66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
67
68 /** @inheritDoc */
69 public readonly emitter?: PoolEmitter
70
71 /**
72 * The task execution response promise map.
73 *
74 * - `key`: The message id of each submitted task.
75 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
76 *
77 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
78 */
79 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
80 new Map<string, PromiseResponseWrapper<Response>>()
81
82 /**
83 * Worker choice strategy context referencing a worker choice algorithm implementation.
84 */
85 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
86 Worker,
87 Data,
88 Response
89 >
90
91 /**
92 * Dynamic pool maximum size property placeholder.
93 */
94 protected readonly max?: number
95
96 /**
97 * Whether the pool is starting or not.
98 */
99 private readonly starting: boolean
100 /**
101 * Whether the pool is started or not.
102 */
103 private started: boolean
104 /**
105 * The start timestamp of the pool.
106 */
107 private readonly startTimestamp
108
109 /**
110 * Constructs a new poolifier pool.
111 *
112 * @param numberOfWorkers - Number of workers that this pool should manage.
113 * @param filePath - Path to the worker file.
114 * @param opts - Options for the pool.
115 */
116 public constructor (
117 protected readonly numberOfWorkers: number,
118 protected readonly filePath: string,
119 protected readonly opts: PoolOptions<Worker>
120 ) {
121 if (!this.isMain()) {
122 throw new Error(
123 'Cannot start a pool from a worker with the same type as the pool'
124 )
125 }
126 this.checkNumberOfWorkers(this.numberOfWorkers)
127 this.checkFilePath(this.filePath)
128 this.checkPoolOptions(this.opts)
129
130 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
131 this.executeTask = this.executeTask.bind(this)
132 this.enqueueTask = this.enqueueTask.bind(this)
133
134 if (this.opts.enableEvents === true) {
135 this.emitter = new PoolEmitter()
136 }
137 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
138 Worker,
139 Data,
140 Response
141 >(
142 this,
143 this.opts.workerChoiceStrategy,
144 this.opts.workerChoiceStrategyOptions
145 )
146
147 this.setupHook()
148
149 this.starting = true
150 this.startPool()
151 this.starting = false
152 this.started = true
153
154 this.startTimestamp = performance.now()
155 }
156
157 private checkFilePath (filePath: string): void {
158 if (
159 filePath == null ||
160 typeof filePath !== 'string' ||
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
163 throw new Error('Please specify a file with a worker implementation')
164 }
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
168 }
169
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
176 throw new TypeError(
177 'Cannot instantiate a pool with a non safe integer number of workers'
178 )
179 } else if (numberOfWorkers < 0) {
180 throw new RangeError(
181 'Cannot instantiate a pool with a negative number of workers'
182 )
183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
189 if (this.type === PoolTypes.dynamic) {
190 if (max == null) {
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
202 } else if (max === 0) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
211 }
212 }
213
214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
215 if (isPlainObject(opts)) {
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
218 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
219 this.opts.workerChoiceStrategyOptions = {
220 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
221 ...opts.workerChoiceStrategyOptions
222 }
223 this.checkValidWorkerChoiceStrategyOptions(
224 this.opts.workerChoiceStrategyOptions
225 )
226 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
227 this.opts.enableEvents = opts.enableEvents ?? true
228 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
229 if (this.opts.enableTasksQueue) {
230 this.checkValidTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
234 opts.tasksQueueOptions as TasksQueueOptions
235 )
236 }
237 } else {
238 throw new TypeError('Invalid pool options: must be a plain object')
239 }
240 }
241
242 private checkValidWorkerChoiceStrategy (
243 workerChoiceStrategy: WorkerChoiceStrategy
244 ): void {
245 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
246 throw new Error(
247 `Invalid worker choice strategy '${workerChoiceStrategy}'`
248 )
249 }
250 }
251
252 private checkValidWorkerChoiceStrategyOptions (
253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
254 ): void {
255 if (!isPlainObject(workerChoiceStrategyOptions)) {
256 throw new TypeError(
257 'Invalid worker choice strategy options: must be a plain object'
258 )
259 }
260 if (
261 workerChoiceStrategyOptions.retries != null &&
262 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: retries must be an integer'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions.retries != null &&
270 workerChoiceStrategyOptions.retries < 0
271 ) {
272 throw new RangeError(
273 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
274 )
275 }
276 if (
277 workerChoiceStrategyOptions.weights != null &&
278 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
279 ) {
280 throw new Error(
281 'Invalid worker choice strategy options: must have a weight for each worker node'
282 )
283 }
284 if (
285 workerChoiceStrategyOptions.measurement != null &&
286 !Object.values(Measurements).includes(
287 workerChoiceStrategyOptions.measurement
288 )
289 ) {
290 throw new Error(
291 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
292 )
293 }
294 }
295
296 private checkValidTasksQueueOptions (
297 tasksQueueOptions: TasksQueueOptions
298 ): void {
299 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
300 throw new TypeError('Invalid tasks queue options: must be a plain object')
301 }
302 if (
303 tasksQueueOptions?.concurrency != null &&
304 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
305 ) {
306 throw new TypeError(
307 'Invalid worker node tasks concurrency: must be an integer'
308 )
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 tasksQueueOptions?.concurrency <= 0
313 ) {
314 throw new RangeError(
315 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
316 )
317 }
318 if (tasksQueueOptions?.queueMaxSize != null) {
319 throw new Error(
320 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
321 )
322 }
323 if (
324 tasksQueueOptions?.size != null &&
325 !Number.isSafeInteger(tasksQueueOptions?.size)
326 ) {
327 throw new TypeError(
328 'Invalid worker node tasks queue size: must be an integer'
329 )
330 }
331 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
332 throw new RangeError(
333 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
334 )
335 }
336 }
337
338 private startPool (): void {
339 while (
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
343 0
344 ) < this.numberOfWorkers
345 ) {
346 this.createAndSetupWorkerNode()
347 }
348 }
349
350 /** @inheritDoc */
351 public get info (): PoolInfo {
352 return {
353 version,
354 type: this.type,
355 worker: this.worker,
356 ready: this.ready,
357 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
358 minSize: this.minSize,
359 maxSize: this.maxSize,
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.aggregate &&
362 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .waitTime.aggregate && { utilization: round(this.utilization) }),
364 workerNodes: this.workerNodes.length,
365 idleWorkerNodes: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 workerNode.usage.tasks.executing === 0
368 ? accumulator + 1
369 : accumulator,
370 0
371 ),
372 busyWorkerNodes: this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
375 0
376 ),
377 executedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.executed,
380 0
381 ),
382 executingTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.executing,
385 0
386 ),
387 ...(this.opts.enableTasksQueue === true && {
388 queuedTasks: this.workerNodes.reduce(
389 (accumulator, workerNode) =>
390 accumulator + workerNode.usage.tasks.queued,
391 0
392 )
393 }),
394 ...(this.opts.enableTasksQueue === true && {
395 maxQueuedTasks: this.workerNodes.reduce(
396 (accumulator, workerNode) =>
397 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
398 0
399 )
400 }),
401 ...(this.opts.enableTasksQueue === true && {
402 backPressure: this.hasBackPressure()
403 }),
404 ...(this.opts.enableTasksQueue === true && {
405 stolenTasks: this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + workerNode.usage.tasks.stolen,
408 0
409 )
410 }),
411 failedTasks: this.workerNodes.reduce(
412 (accumulator, workerNode) =>
413 accumulator + workerNode.usage.tasks.failed,
414 0
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .runTime.aggregate && {
418 runTime: {
419 minimum: round(
420 min(
421 ...this.workerNodes.map(
422 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
423 )
424 )
425 ),
426 maximum: round(
427 max(
428 ...this.workerNodes.map(
429 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
430 )
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .runTime.average && {
435 average: round(
436 average(
437 this.workerNodes.reduce<number[]>(
438 (accumulator, workerNode) =>
439 accumulator.concat(workerNode.usage.runTime.history),
440 []
441 )
442 )
443 )
444 }),
445 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
446 .runTime.median && {
447 median: round(
448 median(
449 this.workerNodes.reduce<number[]>(
450 (accumulator, workerNode) =>
451 accumulator.concat(workerNode.usage.runTime.history),
452 []
453 )
454 )
455 )
456 })
457 }
458 }),
459 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
460 .waitTime.aggregate && {
461 waitTime: {
462 minimum: round(
463 min(
464 ...this.workerNodes.map(
465 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
466 )
467 )
468 ),
469 maximum: round(
470 max(
471 ...this.workerNodes.map(
472 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
473 )
474 )
475 ),
476 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
477 .waitTime.average && {
478 average: round(
479 average(
480 this.workerNodes.reduce<number[]>(
481 (accumulator, workerNode) =>
482 accumulator.concat(workerNode.usage.waitTime.history),
483 []
484 )
485 )
486 )
487 }),
488 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
489 .waitTime.median && {
490 median: round(
491 median(
492 this.workerNodes.reduce<number[]>(
493 (accumulator, workerNode) =>
494 accumulator.concat(workerNode.usage.waitTime.history),
495 []
496 )
497 )
498 )
499 })
500 }
501 })
502 }
503 }
504
505 /**
506 * The pool readiness boolean status.
507 */
508 private get ready (): boolean {
509 return (
510 this.workerNodes.reduce(
511 (accumulator, workerNode) =>
512 !workerNode.info.dynamic && workerNode.info.ready
513 ? accumulator + 1
514 : accumulator,
515 0
516 ) >= this.minSize
517 )
518 }
519
520 /**
521 * The approximate pool utilization.
522 *
523 * @returns The pool utilization.
524 */
525 private get utilization (): number {
526 const poolTimeCapacity =
527 (performance.now() - this.startTimestamp) * this.maxSize
528 const totalTasksRunTime = this.workerNodes.reduce(
529 (accumulator, workerNode) =>
530 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
531 0
532 )
533 const totalTasksWaitTime = this.workerNodes.reduce(
534 (accumulator, workerNode) =>
535 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
536 0
537 )
538 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
539 }
540
541 /**
542 * The pool type.
543 *
544 * If it is `'dynamic'`, it provides the `max` property.
545 */
546 protected abstract get type (): PoolType
547
548 /**
549 * The worker type.
550 */
551 protected abstract get worker (): WorkerType
552
553 /**
554 * The pool minimum size.
555 */
556 protected get minSize (): number {
557 return this.numberOfWorkers
558 }
559
560 /**
561 * The pool maximum size.
562 */
563 protected get maxSize (): number {
564 return this.max ?? this.numberOfWorkers
565 }
566
567 /**
568 * Checks if the worker id sent in the received message from a worker is valid.
569 *
570 * @param message - The received message.
571 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
572 */
573 private checkMessageWorkerId (message: MessageValue<Response>): void {
574 if (message.workerId == null) {
575 throw new Error('Worker message received without worker id')
576 } else if (
577 message.workerId != null &&
578 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
579 ) {
580 throw new Error(
581 `Worker message received from unknown worker '${message.workerId}'`
582 )
583 }
584 }
585
586 /**
587 * Gets the given worker its worker node key.
588 *
589 * @param worker - The worker.
590 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
591 */
592 private getWorkerNodeKeyByWorker (worker: Worker): number {
593 return this.workerNodes.findIndex(
594 workerNode => workerNode.worker === worker
595 )
596 }
597
598 /**
599 * Gets the worker node key given its worker id.
600 *
601 * @param workerId - The worker id.
602 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
603 */
604 private getWorkerNodeKeyByWorkerId (workerId: number): number {
605 return this.workerNodes.findIndex(
606 workerNode => workerNode.info.id === workerId
607 )
608 }
609
610 /** @inheritDoc */
611 public setWorkerChoiceStrategy (
612 workerChoiceStrategy: WorkerChoiceStrategy,
613 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
614 ): void {
615 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
616 this.opts.workerChoiceStrategy = workerChoiceStrategy
617 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
618 this.opts.workerChoiceStrategy
619 )
620 if (workerChoiceStrategyOptions != null) {
621 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
622 }
623 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
624 workerNode.resetUsage()
625 this.sendStatisticsMessageToWorker(workerNodeKey)
626 }
627 }
628
629 /** @inheritDoc */
630 public setWorkerChoiceStrategyOptions (
631 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
632 ): void {
633 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
634 this.opts.workerChoiceStrategyOptions = {
635 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
636 ...workerChoiceStrategyOptions
637 }
638 this.workerChoiceStrategyContext.setOptions(
639 this.opts.workerChoiceStrategyOptions
640 )
641 }
642
643 /** @inheritDoc */
644 public enableTasksQueue (
645 enable: boolean,
646 tasksQueueOptions?: TasksQueueOptions
647 ): void {
648 if (this.opts.enableTasksQueue === true && !enable) {
649 this.flushTasksQueues()
650 }
651 this.opts.enableTasksQueue = enable
652 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
653 }
654
655 /** @inheritDoc */
656 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
657 if (this.opts.enableTasksQueue === true) {
658 this.checkValidTasksQueueOptions(tasksQueueOptions)
659 this.opts.tasksQueueOptions =
660 this.buildTasksQueueOptions(tasksQueueOptions)
661 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
662 } else if (this.opts.tasksQueueOptions != null) {
663 delete this.opts.tasksQueueOptions
664 }
665 }
666
667 private setTasksQueueSize (size: number): void {
668 for (const workerNode of this.workerNodes) {
669 workerNode.tasksQueueBackPressureSize = size
670 }
671 }
672
673 private buildTasksQueueOptions (
674 tasksQueueOptions: TasksQueueOptions
675 ): TasksQueueOptions {
676 return {
677 ...{
678 size: Math.pow(this.maxSize, 2),
679 concurrency: 1
680 },
681 ...tasksQueueOptions
682 }
683 }
684
685 /**
686 * Whether the pool is full or not.
687 *
688 * The pool filling boolean status.
689 */
690 protected get full (): boolean {
691 return this.workerNodes.length >= this.maxSize
692 }
693
694 /**
695 * Whether the pool is busy or not.
696 *
697 * The pool busyness boolean status.
698 */
699 protected abstract get busy (): boolean
700
701 /**
702 * Whether worker nodes are executing concurrently their tasks quota or not.
703 *
704 * @returns Worker nodes busyness boolean status.
705 */
706 protected internalBusy (): boolean {
707 if (this.opts.enableTasksQueue === true) {
708 return (
709 this.workerNodes.findIndex(
710 workerNode =>
711 workerNode.info.ready &&
712 workerNode.usage.tasks.executing <
713 (this.opts.tasksQueueOptions?.concurrency as number)
714 ) === -1
715 )
716 } else {
717 return (
718 this.workerNodes.findIndex(
719 workerNode =>
720 workerNode.info.ready && workerNode.usage.tasks.executing === 0
721 ) === -1
722 )
723 }
724 }
725
726 private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
727 let messagesCount = 0
728 for (const [workerNodeKey] of this.workerNodes.entries()) {
729 this.sendToWorker(workerNodeKey, {
730 ...message,
731 workerId: this.getWorkerInfo(workerNodeKey).id as number
732 })
733 ++messagesCount
734 }
735 return messagesCount
736 }
737
738 /** @inheritDoc */
739 public hasTaskFunction (name: string): boolean {
740 for (const workerNode of this.workerNodes) {
741 if (
742 Array.isArray(workerNode.info.taskFunctionNames) &&
743 workerNode.info.taskFunctionNames.includes(name)
744 ) {
745 return true
746 }
747 }
748 return false
749 }
750
751 /** @inheritDoc */
752 public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
753 this.sendToWorkers({
754 taskFunctionOperation: 'add',
755 taskFunctionName: name,
756 taskFunction: taskFunction.toString()
757 })
758 return true
759 }
760
761 /** @inheritDoc */
762 public removeTaskFunction (name: string): boolean {
763 this.sendToWorkers({
764 taskFunctionOperation: 'remove',
765 taskFunctionName: name
766 })
767 return true
768 }
769
770 /** @inheritDoc */
771 public listTaskFunctionNames (): string[] {
772 for (const workerNode of this.workerNodes) {
773 if (
774 Array.isArray(workerNode.info.taskFunctionNames) &&
775 workerNode.info.taskFunctionNames.length > 0
776 ) {
777 return workerNode.info.taskFunctionNames
778 }
779 }
780 return []
781 }
782
783 /** @inheritDoc */
784 public setDefaultTaskFunction (name: string): boolean {
785 this.sendToWorkers({
786 taskFunctionOperation: 'default',
787 taskFunctionName: name
788 })
789 return true
790 }
791
792 private shallExecuteTask (workerNodeKey: number): boolean {
793 return (
794 this.tasksQueueSize(workerNodeKey) === 0 &&
795 this.workerNodes[workerNodeKey].usage.tasks.executing <
796 (this.opts.tasksQueueOptions?.concurrency as number)
797 )
798 }
799
800 /** @inheritDoc */
801 public async execute (
802 data?: Data,
803 name?: string,
804 transferList?: TransferListItem[]
805 ): Promise<Response> {
806 return await new Promise<Response>((resolve, reject) => {
807 if (!this.started) {
808 reject(new Error('Cannot execute a task on destroyed pool'))
809 return
810 }
811 if (name != null && typeof name !== 'string') {
812 reject(new TypeError('name argument must be a string'))
813 return
814 }
815 if (
816 name != null &&
817 typeof name === 'string' &&
818 name.trim().length === 0
819 ) {
820 reject(new TypeError('name argument must not be an empty string'))
821 return
822 }
823 if (transferList != null && !Array.isArray(transferList)) {
824 reject(new TypeError('transferList argument must be an array'))
825 return
826 }
827 const timestamp = performance.now()
828 const workerNodeKey = this.chooseWorkerNode()
829 const task: Task<Data> = {
830 name: name ?? DEFAULT_TASK_NAME,
831 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
832 data: data ?? ({} as Data),
833 transferList,
834 timestamp,
835 workerId: this.getWorkerInfo(workerNodeKey).id as number,
836 taskId: randomUUID()
837 }
838 this.promiseResponseMap.set(task.taskId as string, {
839 resolve,
840 reject,
841 workerNodeKey
842 })
843 if (
844 this.opts.enableTasksQueue === false ||
845 (this.opts.enableTasksQueue === true &&
846 this.shallExecuteTask(workerNodeKey))
847 ) {
848 this.executeTask(workerNodeKey, task)
849 } else {
850 this.enqueueTask(workerNodeKey, task)
851 }
852 })
853 }
854
855 /** @inheritDoc */
856 public async destroy (): Promise<void> {
857 await Promise.all(
858 this.workerNodes.map(async (_, workerNodeKey) => {
859 await this.destroyWorkerNode(workerNodeKey)
860 })
861 )
862 this.emitter?.emit(PoolEvents.destroy, this.info)
863 this.started = false
864 }
865
866 protected async sendKillMessageToWorker (
867 workerNodeKey: number,
868 workerId: number
869 ): Promise<void> {
870 await new Promise<void>((resolve, reject) => {
871 this.registerWorkerMessageListener(workerNodeKey, message => {
872 if (message.kill === 'success') {
873 resolve()
874 } else if (message.kill === 'failure') {
875 reject(new Error(`Worker ${workerId} kill message handling failed`))
876 }
877 })
878 this.sendToWorker(workerNodeKey, { kill: true, workerId })
879 })
880 }
881
882 /**
883 * Terminates the worker node given its worker node key.
884 *
885 * @param workerNodeKey - The worker node key.
886 */
887 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
888
889 /**
890 * Setup hook to execute code before worker nodes are created in the abstract constructor.
891 * Can be overridden.
892 *
893 * @virtual
894 */
895 protected setupHook (): void {
896 /* Intentionally empty */
897 }
898
899 /**
900 * Should return whether the worker is the main worker or not.
901 */
902 protected abstract isMain (): boolean
903
904 /**
905 * Hook executed before the worker task execution.
906 * Can be overridden.
907 *
908 * @param workerNodeKey - The worker node key.
909 * @param task - The task to execute.
910 */
911 protected beforeTaskExecutionHook (
912 workerNodeKey: number,
913 task: Task<Data>
914 ): void {
915 if (this.workerNodes[workerNodeKey]?.usage != null) {
916 const workerUsage = this.workerNodes[workerNodeKey].usage
917 ++workerUsage.tasks.executing
918 this.updateWaitTimeWorkerUsage(workerUsage, task)
919 }
920 if (
921 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
922 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
923 task.name as string
924 ) != null
925 ) {
926 const taskFunctionWorkerUsage = this.workerNodes[
927 workerNodeKey
928 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
929 ++taskFunctionWorkerUsage.tasks.executing
930 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
931 }
932 }
933
934 /**
935 * Hook executed after the worker task execution.
936 * Can be overridden.
937 *
938 * @param workerNodeKey - The worker node key.
939 * @param message - The received message.
940 */
941 protected afterTaskExecutionHook (
942 workerNodeKey: number,
943 message: MessageValue<Response>
944 ): void {
945 if (this.workerNodes[workerNodeKey]?.usage != null) {
946 const workerUsage = this.workerNodes[workerNodeKey].usage
947 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
948 this.updateRunTimeWorkerUsage(workerUsage, message)
949 this.updateEluWorkerUsage(workerUsage, message)
950 }
951 if (
952 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
953 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
954 message.taskPerformance?.name as string
955 ) != null
956 ) {
957 const taskFunctionWorkerUsage = this.workerNodes[
958 workerNodeKey
959 ].getTaskFunctionWorkerUsage(
960 message.taskPerformance?.name as string
961 ) as WorkerUsage
962 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
963 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
964 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
965 }
966 }
967
968 /**
969 * Whether the worker node shall update its task function worker usage or not.
970 *
971 * @param workerNodeKey - The worker node key.
972 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
973 */
974 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
975 const workerInfo = this.getWorkerInfo(workerNodeKey)
976 return (
977 workerInfo != null &&
978 Array.isArray(workerInfo.taskFunctionNames) &&
979 workerInfo.taskFunctionNames.length > 2
980 )
981 }
982
983 private updateTaskStatisticsWorkerUsage (
984 workerUsage: WorkerUsage,
985 message: MessageValue<Response>
986 ): void {
987 const workerTaskStatistics = workerUsage.tasks
988 if (
989 workerTaskStatistics.executing != null &&
990 workerTaskStatistics.executing > 0
991 ) {
992 --workerTaskStatistics.executing
993 }
994 if (message.workerError == null) {
995 ++workerTaskStatistics.executed
996 } else {
997 ++workerTaskStatistics.failed
998 }
999 }
1000
1001 private updateRunTimeWorkerUsage (
1002 workerUsage: WorkerUsage,
1003 message: MessageValue<Response>
1004 ): void {
1005 if (message.workerError != null) {
1006 return
1007 }
1008 updateMeasurementStatistics(
1009 workerUsage.runTime,
1010 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1011 message.taskPerformance?.runTime ?? 0
1012 )
1013 }
1014
1015 private updateWaitTimeWorkerUsage (
1016 workerUsage: WorkerUsage,
1017 task: Task<Data>
1018 ): void {
1019 const timestamp = performance.now()
1020 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1021 updateMeasurementStatistics(
1022 workerUsage.waitTime,
1023 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1024 taskWaitTime
1025 )
1026 }
1027
1028 private updateEluWorkerUsage (
1029 workerUsage: WorkerUsage,
1030 message: MessageValue<Response>
1031 ): void {
1032 if (message.workerError != null) {
1033 return
1034 }
1035 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1036 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1037 updateMeasurementStatistics(
1038 workerUsage.elu.active,
1039 eluTaskStatisticsRequirements,
1040 message.taskPerformance?.elu?.active ?? 0
1041 )
1042 updateMeasurementStatistics(
1043 workerUsage.elu.idle,
1044 eluTaskStatisticsRequirements,
1045 message.taskPerformance?.elu?.idle ?? 0
1046 )
1047 if (eluTaskStatisticsRequirements.aggregate) {
1048 if (message.taskPerformance?.elu != null) {
1049 if (workerUsage.elu.utilization != null) {
1050 workerUsage.elu.utilization =
1051 (workerUsage.elu.utilization +
1052 message.taskPerformance.elu.utilization) /
1053 2
1054 } else {
1055 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1056 }
1057 }
1058 }
1059 }
1060
1061 /**
1062 * Chooses a worker node for the next task.
1063 *
1064 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1065 *
1066 * @returns The chosen worker node key
1067 */
1068 private chooseWorkerNode (): number {
1069 if (this.shallCreateDynamicWorker()) {
1070 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1071 if (
1072 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1073 ) {
1074 return workerNodeKey
1075 }
1076 }
1077 return this.workerChoiceStrategyContext.execute()
1078 }
1079
1080 /**
1081 * Conditions for dynamic worker creation.
1082 *
1083 * @returns Whether to create a dynamic worker or not.
1084 */
1085 private shallCreateDynamicWorker (): boolean {
1086 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1087 }
1088
1089 /**
1090 * Sends a message to worker given its worker node key.
1091 *
1092 * @param workerNodeKey - The worker node key.
1093 * @param message - The message.
1094 * @param transferList - The optional array of transferable objects.
1095 */
1096 protected abstract sendToWorker (
1097 workerNodeKey: number,
1098 message: MessageValue<Data>,
1099 transferList?: TransferListItem[]
1100 ): void
1101
1102 /**
1103 * Creates a new worker.
1104 *
1105 * @returns Newly created worker.
1106 */
1107 protected abstract createWorker (): Worker
1108
1109 /**
1110 * Creates a new, completely set up worker node.
1111 *
1112 * @returns New, completely set up worker node key.
1113 */
1114 protected createAndSetupWorkerNode (): number {
1115 const worker = this.createWorker()
1116
1117 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1118 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1119 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1120 worker.on('error', error => {
1121 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1122 const workerInfo = this.getWorkerInfo(workerNodeKey)
1123 workerInfo.ready = false
1124 this.workerNodes[workerNodeKey].closeChannel()
1125 this.emitter?.emit(PoolEvents.error, error)
1126 if (
1127 this.opts.restartWorkerOnError === true &&
1128 this.started &&
1129 !this.starting
1130 ) {
1131 if (workerInfo.dynamic) {
1132 this.createAndSetupDynamicWorkerNode()
1133 } else {
1134 this.createAndSetupWorkerNode()
1135 }
1136 }
1137 if (this.opts.enableTasksQueue === true) {
1138 this.redistributeQueuedTasks(workerNodeKey)
1139 }
1140 })
1141 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1142 worker.once('exit', () => {
1143 this.removeWorkerNode(worker)
1144 })
1145
1146 const workerNodeKey = this.addWorkerNode(worker)
1147
1148 this.afterWorkerNodeSetup(workerNodeKey)
1149
1150 return workerNodeKey
1151 }
1152
1153 /**
1154 * Creates a new, completely set up dynamic worker node.
1155 *
1156 * @returns New, completely set up dynamic worker node key.
1157 */
1158 protected createAndSetupDynamicWorkerNode (): number {
1159 const workerNodeKey = this.createAndSetupWorkerNode()
1160 this.registerWorkerMessageListener(workerNodeKey, message => {
1161 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1162 message.workerId
1163 )
1164 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1165 // Kill message received from worker
1166 if (
1167 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1168 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1169 ((this.opts.enableTasksQueue === false &&
1170 workerUsage.tasks.executing === 0) ||
1171 (this.opts.enableTasksQueue === true &&
1172 workerUsage.tasks.executing === 0 &&
1173 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1174 ) {
1175 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1176 this.emitter?.emit(PoolEvents.error, error)
1177 })
1178 }
1179 })
1180 const workerInfo = this.getWorkerInfo(workerNodeKey)
1181 this.sendToWorker(workerNodeKey, {
1182 checkActive: true,
1183 workerId: workerInfo.id as number
1184 })
1185 workerInfo.dynamic = true
1186 if (
1187 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1188 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1189 ) {
1190 workerInfo.ready = true
1191 }
1192 this.checkAndEmitDynamicWorkerCreationEvents()
1193 return workerNodeKey
1194 }
1195
1196 /**
1197 * Registers a listener callback on the worker given its worker node key.
1198 *
1199 * @param workerNodeKey - The worker node key.
1200 * @param listener - The message listener callback.
1201 */
1202 protected abstract registerWorkerMessageListener<
1203 Message extends Data | Response
1204 >(
1205 workerNodeKey: number,
1206 listener: (message: MessageValue<Message>) => void
1207 ): void
1208
1209 /**
1210 * Method hooked up after a worker node has been newly created.
1211 * Can be overridden.
1212 *
1213 * @param workerNodeKey - The newly created worker node key.
1214 */
1215 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1216 // Listen to worker messages.
1217 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1218 // Send the startup message to worker.
1219 this.sendStartupMessageToWorker(workerNodeKey)
1220 // Send the statistics message to worker.
1221 this.sendStatisticsMessageToWorker(workerNodeKey)
1222 if (this.opts.enableTasksQueue === true) {
1223 this.workerNodes[workerNodeKey].onEmptyQueue =
1224 this.taskStealingOnEmptyQueue.bind(this)
1225 this.workerNodes[workerNodeKey].onBackPressure =
1226 this.tasksStealingOnBackPressure.bind(this)
1227 }
1228 }
1229
1230 /**
1231 * Sends the startup message to worker given its worker node key.
1232 *
1233 * @param workerNodeKey - The worker node key.
1234 */
1235 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1236
1237 /**
1238 * Sends the statistics message to worker given its worker node key.
1239 *
1240 * @param workerNodeKey - The worker node key.
1241 */
1242 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1243 this.sendToWorker(workerNodeKey, {
1244 statistics: {
1245 runTime:
1246 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1247 .runTime.aggregate,
1248 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1249 .elu.aggregate
1250 },
1251 workerId: this.getWorkerInfo(workerNodeKey).id as number
1252 })
1253 }
1254
1255 private redistributeQueuedTasks (workerNodeKey: number): void {
1256 while (this.tasksQueueSize(workerNodeKey) > 0) {
1257 const destinationWorkerNodeKey = this.workerNodes.reduce(
1258 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1259 return workerNode.info.ready &&
1260 workerNode.usage.tasks.queued <
1261 workerNodes[minWorkerNodeKey].usage.tasks.queued
1262 ? workerNodeKey
1263 : minWorkerNodeKey
1264 },
1265 0
1266 )
1267 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1268 const task = {
1269 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1270 workerId: destinationWorkerNode.info.id as number
1271 }
1272 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1273 this.executeTask(destinationWorkerNodeKey, task)
1274 } else {
1275 this.enqueueTask(destinationWorkerNodeKey, task)
1276 }
1277 }
1278 }
1279
1280 private updateTaskStolenStatisticsWorkerUsage (
1281 workerNodeKey: number,
1282 taskName: string
1283 ): void {
1284 const workerNode = this.workerNodes[workerNodeKey]
1285 if (workerNode?.usage != null) {
1286 ++workerNode.usage.tasks.stolen
1287 }
1288 if (
1289 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1290 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1291 ) {
1292 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1293 taskName
1294 ) as WorkerUsage
1295 ++taskFunctionWorkerUsage.tasks.stolen
1296 }
1297 }
1298
1299 private taskStealingOnEmptyQueue (workerId: number): void {
1300 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1301 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1302 const workerNodes = this.workerNodes
1303 .slice()
1304 .sort(
1305 (workerNodeA, workerNodeB) =>
1306 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1307 )
1308 const sourceWorkerNode = workerNodes.find(
1309 workerNode =>
1310 workerNode.info.ready &&
1311 workerNode.info.id !== workerId &&
1312 workerNode.usage.tasks.queued > 0
1313 )
1314 if (sourceWorkerNode != null) {
1315 const task = {
1316 ...(sourceWorkerNode.popTask() as Task<Data>),
1317 workerId: destinationWorkerNode.info.id as number
1318 }
1319 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1320 this.executeTask(destinationWorkerNodeKey, task)
1321 } else {
1322 this.enqueueTask(destinationWorkerNodeKey, task)
1323 }
1324 this.updateTaskStolenStatisticsWorkerUsage(
1325 destinationWorkerNodeKey,
1326 task.name as string
1327 )
1328 }
1329 }
1330
1331 private tasksStealingOnBackPressure (workerId: number): void {
1332 const sizeOffset = 1
1333 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1334 return
1335 }
1336 const sourceWorkerNode =
1337 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1338 const workerNodes = this.workerNodes
1339 .slice()
1340 .sort(
1341 (workerNodeA, workerNodeB) =>
1342 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1343 )
1344 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1345 if (
1346 sourceWorkerNode.usage.tasks.queued > 0 &&
1347 workerNode.info.ready &&
1348 workerNode.info.id !== workerId &&
1349 workerNode.usage.tasks.queued <
1350 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1351 ) {
1352 const task = {
1353 ...(sourceWorkerNode.popTask() as Task<Data>),
1354 workerId: workerNode.info.id as number
1355 }
1356 if (this.shallExecuteTask(workerNodeKey)) {
1357 this.executeTask(workerNodeKey, task)
1358 } else {
1359 this.enqueueTask(workerNodeKey, task)
1360 }
1361 this.updateTaskStolenStatisticsWorkerUsage(
1362 workerNodeKey,
1363 task.name as string
1364 )
1365 }
1366 }
1367 }
1368
1369 /**
1370 * This method is the listener registered for each worker message.
1371 *
1372 * @returns The listener function to execute when a message is received from a worker.
1373 */
1374 protected workerListener (): (message: MessageValue<Response>) => void {
1375 return message => {
1376 this.checkMessageWorkerId(message)
1377 if (message.ready != null && message.taskFunctionNames != null) {
1378 // Worker ready response received from worker
1379 this.handleWorkerReadyResponse(message)
1380 } else if (message.taskId != null) {
1381 // Task execution response received from worker
1382 this.handleTaskExecutionResponse(message)
1383 } else if (message.taskFunctionNames != null) {
1384 // Task function names message received from worker
1385 this.getWorkerInfo(
1386 this.getWorkerNodeKeyByWorkerId(message.workerId)
1387 ).taskFunctionNames = message.taskFunctionNames
1388 } else if (message.taskFunctionOperation != null) {
1389 // Task function operation response received from worker
1390 }
1391 }
1392 }
1393
1394 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1395 if (message.ready === false) {
1396 throw new Error(`Worker ${message.workerId} failed to initialize`)
1397 }
1398 const workerInfo = this.getWorkerInfo(
1399 this.getWorkerNodeKeyByWorkerId(message.workerId)
1400 )
1401 workerInfo.ready = message.ready as boolean
1402 workerInfo.taskFunctionNames = message.taskFunctionNames
1403 if (this.emitter != null && this.ready) {
1404 this.emitter.emit(PoolEvents.ready, this.info)
1405 }
1406 }
1407
1408 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1409 const { taskId, workerError, data } = message
1410 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1411 if (promiseResponse != null) {
1412 if (workerError != null) {
1413 this.emitter?.emit(PoolEvents.taskError, workerError)
1414 promiseResponse.reject(workerError.message)
1415 } else {
1416 promiseResponse.resolve(data as Response)
1417 }
1418 const workerNodeKey = promiseResponse.workerNodeKey
1419 this.afterTaskExecutionHook(workerNodeKey, message)
1420 this.workerChoiceStrategyContext.update(workerNodeKey)
1421 this.promiseResponseMap.delete(taskId as string)
1422 if (
1423 this.opts.enableTasksQueue === true &&
1424 this.tasksQueueSize(workerNodeKey) > 0 &&
1425 this.workerNodes[workerNodeKey].usage.tasks.executing <
1426 (this.opts.tasksQueueOptions?.concurrency as number)
1427 ) {
1428 this.executeTask(
1429 workerNodeKey,
1430 this.dequeueTask(workerNodeKey) as Task<Data>
1431 )
1432 }
1433 }
1434 }
1435
1436 private checkAndEmitTaskExecutionEvents (): void {
1437 if (this.busy) {
1438 this.emitter?.emit(PoolEvents.busy, this.info)
1439 }
1440 }
1441
1442 private checkAndEmitTaskQueuingEvents (): void {
1443 if (this.hasBackPressure()) {
1444 this.emitter?.emit(PoolEvents.backPressure, this.info)
1445 }
1446 }
1447
1448 private checkAndEmitDynamicWorkerCreationEvents (): void {
1449 if (this.type === PoolTypes.dynamic) {
1450 if (this.full) {
1451 this.emitter?.emit(PoolEvents.full, this.info)
1452 }
1453 }
1454 }
1455
1456 /**
1457 * Gets the worker information given its worker node key.
1458 *
1459 * @param workerNodeKey - The worker node key.
1460 * @returns The worker information.
1461 */
1462 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1463 return this.workerNodes[workerNodeKey].info
1464 }
1465
1466 /**
1467 * Adds the given worker in the pool worker nodes.
1468 *
1469 * @param worker - The worker.
1470 * @returns The added worker node key.
1471 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1472 */
1473 private addWorkerNode (worker: Worker): number {
1474 const workerNode = new WorkerNode<Worker, Data>(
1475 worker,
1476 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1477 )
1478 // Flag the worker node as ready at pool startup.
1479 if (this.starting) {
1480 workerNode.info.ready = true
1481 }
1482 this.workerNodes.push(workerNode)
1483 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1484 if (workerNodeKey === -1) {
1485 throw new Error('Worker added not found in worker nodes')
1486 }
1487 return workerNodeKey
1488 }
1489
1490 /**
1491 * Removes the given worker from the pool worker nodes.
1492 *
1493 * @param worker - The worker.
1494 */
1495 private removeWorkerNode (worker: Worker): void {
1496 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1497 if (workerNodeKey !== -1) {
1498 this.workerNodes.splice(workerNodeKey, 1)
1499 this.workerChoiceStrategyContext.remove(workerNodeKey)
1500 }
1501 }
1502
1503 /** @inheritDoc */
1504 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1505 return (
1506 this.opts.enableTasksQueue === true &&
1507 this.workerNodes[workerNodeKey].hasBackPressure()
1508 )
1509 }
1510
1511 private hasBackPressure (): boolean {
1512 return (
1513 this.opts.enableTasksQueue === true &&
1514 this.workerNodes.findIndex(
1515 workerNode => !workerNode.hasBackPressure()
1516 ) === -1
1517 )
1518 }
1519
1520 /**
1521 * Executes the given task on the worker given its worker node key.
1522 *
1523 * @param workerNodeKey - The worker node key.
1524 * @param task - The task to execute.
1525 */
1526 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1527 this.beforeTaskExecutionHook(workerNodeKey, task)
1528 this.sendToWorker(workerNodeKey, task, task.transferList)
1529 this.checkAndEmitTaskExecutionEvents()
1530 }
1531
1532 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1533 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1534 this.checkAndEmitTaskQueuingEvents()
1535 return tasksQueueSize
1536 }
1537
1538 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1539 return this.workerNodes[workerNodeKey].dequeueTask()
1540 }
1541
1542 private tasksQueueSize (workerNodeKey: number): number {
1543 return this.workerNodes[workerNodeKey].tasksQueueSize()
1544 }
1545
1546 protected flushTasksQueue (workerNodeKey: number): void {
1547 while (this.tasksQueueSize(workerNodeKey) > 0) {
1548 this.executeTask(
1549 workerNodeKey,
1550 this.dequeueTask(workerNodeKey) as Task<Data>
1551 )
1552 }
1553 this.workerNodes[workerNodeKey].clearTasksQueue()
1554 }
1555
1556 private flushTasksQueues (): void {
1557 for (const [workerNodeKey] of this.workerNodes.entries()) {
1558 this.flushTasksQueue(workerNodeKey)
1559 }
1560 }
1561 }