d415ac9cd827debe42315ff81580b94ef301ea70
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task,
9 Writable
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 average,
16 isKillBehavior,
17 isPlainObject,
18 median,
19 round,
20 updateMeasurementStatistics
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50
51 /**
52 * Base class that implements some shared logic for all poolifier pools.
53 *
54 * @typeParam Worker - Type of worker which manages this pool.
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
57 */
58 export abstract class AbstractPool<
59 Worker extends IWorker,
60 Data = unknown,
61 Response = unknown
62 > implements IPool<Worker, Data, Response> {
63 /** @inheritDoc */
64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
65
66 /** @inheritDoc */
67 public readonly emitter?: PoolEmitter
68
69 /**
70 * The task execution response promise map.
71 *
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is starting or not.
96 */
97 private readonly starting: boolean
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.starting = true
148 this.startPool()
149 this.starting = false
150 this.started = true
151
152 this.startTimestamp = performance.now()
153 }
154
155 private checkFilePath (filePath: string): void {
156 if (
157 filePath == null ||
158 typeof filePath !== 'string' ||
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
161 throw new Error('Please specify a file with a worker implementation')
162 }
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
166 }
167
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
174 throw new TypeError(
175 'Cannot instantiate a pool with a non safe integer number of workers'
176 )
177 } else if (numberOfWorkers < 0) {
178 throw new RangeError(
179 'Cannot instantiate a pool with a negative number of workers'
180 )
181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
187 if (this.type === PoolTypes.dynamic) {
188 if (max == null) {
189 throw new TypeError(
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
200 } else if (max === 0) {
201 throw new RangeError(
202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
244 throw new Error(
245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
246 )
247 }
248 }
249
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
274 if (
275 workerChoiceStrategyOptions.weights != null &&
276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
292 }
293
294 private checkValidTasksQueueOptions (
295 tasksQueueOptions: Writable<TasksQueueOptions>
296 ): void {
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
305 'Invalid worker node tasks concurrency: must be an integer'
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
312 throw new RangeError(
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (tasksQueueOptions?.queueMaxSize != null) {
317 throw new Error(
318 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
319 )
320 }
321 if (
322 tasksQueueOptions?.size != null &&
323 !Number.isSafeInteger(tasksQueueOptions.size)
324 ) {
325 throw new TypeError(
326 'Invalid worker node tasks queue size: must be an integer'
327 )
328 }
329 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
330 throw new RangeError(
331 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
332 )
333 }
334 }
335
336 private startPool (): void {
337 while (
338 this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
341 0
342 ) < this.numberOfWorkers
343 ) {
344 this.createAndSetupWorkerNode()
345 }
346 }
347
348 /** @inheritDoc */
349 public get info (): PoolInfo {
350 return {
351 version,
352 type: this.type,
353 worker: this.worker,
354 ready: this.ready,
355 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
356 minSize: this.minSize,
357 maxSize: this.maxSize,
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .runTime.aggregate &&
360 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && { utilization: round(this.utilization) }),
362 workerNodes: this.workerNodes.length,
363 idleWorkerNodes: this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 workerNode.usage.tasks.executing === 0
366 ? accumulator + 1
367 : accumulator,
368 0
369 ),
370 busyWorkerNodes: this.workerNodes.reduce(
371 (accumulator, workerNode) =>
372 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
373 0
374 ),
375 executedTasks: this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 accumulator + workerNode.usage.tasks.executed,
378 0
379 ),
380 executingTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + workerNode.usage.tasks.executing,
383 0
384 ),
385 ...(this.opts.enableTasksQueue === true && {
386 queuedTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.queued,
389 0
390 )
391 }),
392 ...(this.opts.enableTasksQueue === true && {
393 maxQueuedTasks: this.workerNodes.reduce(
394 (accumulator, workerNode) =>
395 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
396 0
397 )
398 }),
399 ...(this.opts.enableTasksQueue === true && {
400 backPressure: this.hasBackPressure()
401 }),
402 ...(this.opts.enableTasksQueue === true && {
403 stolenTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + workerNode.usage.tasks.stolen,
406 0
407 )
408 }),
409 failedTasks: this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + workerNode.usage.tasks.failed,
412 0
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .runTime.aggregate && {
416 runTime: {
417 minimum: round(
418 Math.min(
419 ...this.workerNodes.map(
420 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
421 )
422 )
423 ),
424 maximum: round(
425 Math.max(
426 ...this.workerNodes.map(
427 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
428 )
429 )
430 ),
431 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
432 .runTime.average && {
433 average: round(
434 average(
435 this.workerNodes.reduce<number[]>(
436 (accumulator, workerNode) =>
437 accumulator.concat(workerNode.usage.runTime.history),
438 []
439 )
440 )
441 )
442 }),
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .runTime.median && {
445 median: round(
446 median(
447 this.workerNodes.reduce<number[]>(
448 (accumulator, workerNode) =>
449 accumulator.concat(workerNode.usage.runTime.history),
450 []
451 )
452 )
453 )
454 })
455 }
456 }),
457 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
458 .waitTime.aggregate && {
459 waitTime: {
460 minimum: round(
461 Math.min(
462 ...this.workerNodes.map(
463 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
464 )
465 )
466 ),
467 maximum: round(
468 Math.max(
469 ...this.workerNodes.map(
470 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
471 )
472 )
473 ),
474 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
475 .waitTime.average && {
476 average: round(
477 average(
478 this.workerNodes.reduce<number[]>(
479 (accumulator, workerNode) =>
480 accumulator.concat(workerNode.usage.waitTime.history),
481 []
482 )
483 )
484 )
485 }),
486 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 .waitTime.median && {
488 median: round(
489 median(
490 this.workerNodes.reduce<number[]>(
491 (accumulator, workerNode) =>
492 accumulator.concat(workerNode.usage.waitTime.history),
493 []
494 )
495 )
496 )
497 })
498 }
499 })
500 }
501 }
502
503 /**
504 * The pool readiness boolean status.
505 */
506 private get ready (): boolean {
507 return (
508 this.workerNodes.reduce(
509 (accumulator, workerNode) =>
510 !workerNode.info.dynamic && workerNode.info.ready
511 ? accumulator + 1
512 : accumulator,
513 0
514 ) >= this.minSize
515 )
516 }
517
518 /**
519 * The approximate pool utilization.
520 *
521 * @returns The pool utilization.
522 */
523 private get utilization (): number {
524 const poolTimeCapacity =
525 (performance.now() - this.startTimestamp) * this.maxSize
526 const totalTasksRunTime = this.workerNodes.reduce(
527 (accumulator, workerNode) =>
528 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
529 0
530 )
531 const totalTasksWaitTime = this.workerNodes.reduce(
532 (accumulator, workerNode) =>
533 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
534 0
535 )
536 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
537 }
538
539 /**
540 * The pool type.
541 *
542 * If it is `'dynamic'`, it provides the `max` property.
543 */
544 protected abstract get type (): PoolType
545
546 /**
547 * The worker type.
548 */
549 protected abstract get worker (): WorkerType
550
551 /**
552 * The pool minimum size.
553 */
554 protected get minSize (): number {
555 return this.numberOfWorkers
556 }
557
558 /**
559 * The pool maximum size.
560 */
561 protected get maxSize (): number {
562 return this.max ?? this.numberOfWorkers
563 }
564
565 /**
566 * Checks if the worker id sent in the received message from a worker is valid.
567 *
568 * @param message - The received message.
569 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
570 */
571 private checkMessageWorkerId (message: MessageValue<Response>): void {
572 if (message.workerId == null) {
573 throw new Error('Worker message received without worker id')
574 } else if (
575 message.workerId != null &&
576 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
577 ) {
578 throw new Error(
579 `Worker message received from unknown worker '${message.workerId}'`
580 )
581 }
582 }
583
584 /**
585 * Gets the given worker its worker node key.
586 *
587 * @param worker - The worker.
588 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
589 */
590 private getWorkerNodeKeyByWorker (worker: Worker): number {
591 return this.workerNodes.findIndex(
592 (workerNode) => workerNode.worker === worker
593 )
594 }
595
596 /**
597 * Gets the worker node key given its worker id.
598 *
599 * @param workerId - The worker id.
600 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
601 */
602 private getWorkerNodeKeyByWorkerId (workerId: number): number {
603 return this.workerNodes.findIndex(
604 (workerNode) => workerNode.info.id === workerId
605 )
606 }
607
608 /** @inheritDoc */
609 public setWorkerChoiceStrategy (
610 workerChoiceStrategy: WorkerChoiceStrategy,
611 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
612 ): void {
613 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
614 this.opts.workerChoiceStrategy = workerChoiceStrategy
615 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
616 this.opts.workerChoiceStrategy
617 )
618 if (workerChoiceStrategyOptions != null) {
619 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
620 }
621 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
622 workerNode.resetUsage()
623 this.sendStatisticsMessageToWorker(workerNodeKey)
624 }
625 }
626
627 /** @inheritDoc */
628 public setWorkerChoiceStrategyOptions (
629 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
630 ): void {
631 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
632 this.opts.workerChoiceStrategyOptions = {
633 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
634 ...workerChoiceStrategyOptions
635 }
636 this.workerChoiceStrategyContext.setOptions(
637 this.opts.workerChoiceStrategyOptions
638 )
639 }
640
641 /** @inheritDoc */
642 public enableTasksQueue (
643 enable: boolean,
644 tasksQueueOptions?: TasksQueueOptions
645 ): void {
646 if (this.opts.enableTasksQueue === true && !enable) {
647 this.flushTasksQueues()
648 }
649 this.opts.enableTasksQueue = enable
650 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
651 }
652
653 /** @inheritDoc */
654 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
655 if (this.opts.enableTasksQueue === true) {
656 this.checkValidTasksQueueOptions(tasksQueueOptions)
657 this.opts.tasksQueueOptions =
658 this.buildTasksQueueOptions(tasksQueueOptions)
659 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
660 } else if (this.opts.tasksQueueOptions != null) {
661 delete this.opts.tasksQueueOptions
662 }
663 }
664
665 private setTasksQueueMaxSize (size: number): void {
666 for (const workerNode of this.workerNodes) {
667 workerNode.tasksQueueBackPressureSize = size
668 }
669 }
670
671 private buildTasksQueueOptions (
672 tasksQueueOptions: TasksQueueOptions
673 ): TasksQueueOptions {
674 return {
675 ...{
676 size: Math.pow(this.maxSize, 2),
677 concurrency: 1
678 },
679 ...tasksQueueOptions
680 }
681 }
682
683 /**
684 * Whether the pool is full or not.
685 *
686 * The pool filling boolean status.
687 */
688 protected get full (): boolean {
689 return this.workerNodes.length >= this.maxSize
690 }
691
692 /**
693 * Whether the pool is busy or not.
694 *
695 * The pool busyness boolean status.
696 */
697 protected abstract get busy (): boolean
698
699 /**
700 * Whether worker nodes are executing concurrently their tasks quota or not.
701 *
702 * @returns Worker nodes busyness boolean status.
703 */
704 protected internalBusy (): boolean {
705 if (this.opts.enableTasksQueue === true) {
706 return (
707 this.workerNodes.findIndex(
708 (workerNode) =>
709 workerNode.info.ready &&
710 workerNode.usage.tasks.executing <
711 (this.opts.tasksQueueOptions?.concurrency as number)
712 ) === -1
713 )
714 } else {
715 return (
716 this.workerNodes.findIndex(
717 (workerNode) =>
718 workerNode.info.ready && workerNode.usage.tasks.executing === 0
719 ) === -1
720 )
721 }
722 }
723
724 /** @inheritDoc */
725 public listTaskFunctions (): string[] {
726 for (const workerNode of this.workerNodes) {
727 if (
728 Array.isArray(workerNode.info.taskFunctions) &&
729 workerNode.info.taskFunctions.length > 0
730 ) {
731 return workerNode.info.taskFunctions
732 }
733 }
734 return []
735 }
736
737 /** @inheritDoc */
738 public async execute (
739 data?: Data,
740 name?: string,
741 transferList?: TransferListItem[]
742 ): Promise<Response> {
743 return await new Promise<Response>((resolve, reject) => {
744 if (!this.started) {
745 reject(new Error('Cannot execute a task on destroyed pool'))
746 }
747 if (name != null && typeof name !== 'string') {
748 reject(new TypeError('name argument must be a string'))
749 }
750 if (
751 name != null &&
752 typeof name === 'string' &&
753 name.trim().length === 0
754 ) {
755 reject(new TypeError('name argument must not be an empty string'))
756 }
757 if (transferList != null && !Array.isArray(transferList)) {
758 reject(new TypeError('transferList argument must be an array'))
759 }
760 const timestamp = performance.now()
761 const workerNodeKey = this.chooseWorkerNode()
762 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
763 if (
764 name != null &&
765 Array.isArray(workerInfo.taskFunctions) &&
766 !workerInfo.taskFunctions.includes(name)
767 ) {
768 reject(
769 new Error(`Task function '${name}' is not registered in the pool`)
770 )
771 }
772 const task: Task<Data> = {
773 name: name ?? DEFAULT_TASK_NAME,
774 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
775 data: data ?? ({} as Data),
776 transferList,
777 timestamp,
778 workerId: workerInfo.id as number,
779 taskId: randomUUID()
780 }
781 this.promiseResponseMap.set(task.taskId as string, {
782 resolve,
783 reject,
784 workerNodeKey
785 })
786 if (
787 this.opts.enableTasksQueue === false ||
788 (this.opts.enableTasksQueue === true &&
789 this.tasksQueueSize(workerNodeKey) === 0 &&
790 this.workerNodes[workerNodeKey].usage.tasks.executing <
791 (this.opts.tasksQueueOptions?.concurrency as number))
792 ) {
793 this.executeTask(workerNodeKey, task)
794 } else {
795 this.enqueueTask(workerNodeKey, task)
796 }
797 })
798 }
799
800 /** @inheritDoc */
801 public async destroy (): Promise<void> {
802 await Promise.all(
803 this.workerNodes.map(async (_, workerNodeKey) => {
804 await this.destroyWorkerNode(workerNodeKey)
805 })
806 )
807 this.emitter?.emit(PoolEvents.destroy, this.info)
808 this.started = false
809 }
810
811 protected async sendKillMessageToWorker (
812 workerNodeKey: number,
813 workerId: number
814 ): Promise<void> {
815 await new Promise<void>((resolve, reject) => {
816 this.registerWorkerMessageListener(workerNodeKey, (message) => {
817 if (message.kill === 'success') {
818 resolve()
819 } else if (message.kill === 'failure') {
820 reject(new Error(`Worker ${workerId} kill message handling failed`))
821 }
822 })
823 this.sendToWorker(workerNodeKey, { kill: true, workerId })
824 })
825 }
826
827 /**
828 * Terminates the worker node given its worker node key.
829 *
830 * @param workerNodeKey - The worker node key.
831 */
832 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
833
834 /**
835 * Setup hook to execute code before worker nodes are created in the abstract constructor.
836 * Can be overridden.
837 *
838 * @virtual
839 */
840 protected setupHook (): void {
841 /** Intentionally empty */
842 }
843
844 /**
845 * Should return whether the worker is the main worker or not.
846 */
847 protected abstract isMain (): boolean
848
849 /**
850 * Hook executed before the worker task execution.
851 * Can be overridden.
852 *
853 * @param workerNodeKey - The worker node key.
854 * @param task - The task to execute.
855 */
856 protected beforeTaskExecutionHook (
857 workerNodeKey: number,
858 task: Task<Data>
859 ): void {
860 if (this.workerNodes[workerNodeKey]?.usage != null) {
861 const workerUsage = this.workerNodes[workerNodeKey].usage
862 ++workerUsage.tasks.executing
863 this.updateWaitTimeWorkerUsage(workerUsage, task)
864 }
865 if (
866 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
867 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
868 task.name as string
869 ) != null
870 ) {
871 const taskFunctionWorkerUsage = this.workerNodes[
872 workerNodeKey
873 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
874 ++taskFunctionWorkerUsage.tasks.executing
875 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
876 }
877 }
878
879 /**
880 * Hook executed after the worker task execution.
881 * Can be overridden.
882 *
883 * @param workerNodeKey - The worker node key.
884 * @param message - The received message.
885 */
886 protected afterTaskExecutionHook (
887 workerNodeKey: number,
888 message: MessageValue<Response>
889 ): void {
890 if (this.workerNodes[workerNodeKey]?.usage != null) {
891 const workerUsage = this.workerNodes[workerNodeKey].usage
892 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
893 this.updateRunTimeWorkerUsage(workerUsage, message)
894 this.updateEluWorkerUsage(workerUsage, message)
895 }
896 if (
897 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
898 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
899 message.taskPerformance?.name as string
900 ) != null
901 ) {
902 const taskFunctionWorkerUsage = this.workerNodes[
903 workerNodeKey
904 ].getTaskFunctionWorkerUsage(
905 message.taskPerformance?.name as string
906 ) as WorkerUsage
907 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
908 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
909 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
910 }
911 }
912
913 /**
914 * Whether the worker node shall update its task function worker usage or not.
915 *
916 * @param workerNodeKey - The worker node key.
917 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
918 */
919 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
920 const workerInfo = this.getWorkerInfo(workerNodeKey)
921 return (
922 workerInfo != null &&
923 Array.isArray(workerInfo.taskFunctions) &&
924 workerInfo.taskFunctions.length > 2
925 )
926 }
927
928 private updateTaskStatisticsWorkerUsage (
929 workerUsage: WorkerUsage,
930 message: MessageValue<Response>
931 ): void {
932 const workerTaskStatistics = workerUsage.tasks
933 if (
934 workerTaskStatistics.executing != null &&
935 workerTaskStatistics.executing > 0
936 ) {
937 --workerTaskStatistics.executing
938 }
939 if (message.taskError == null) {
940 ++workerTaskStatistics.executed
941 } else {
942 ++workerTaskStatistics.failed
943 }
944 }
945
946 private updateRunTimeWorkerUsage (
947 workerUsage: WorkerUsage,
948 message: MessageValue<Response>
949 ): void {
950 if (message.taskError != null) {
951 return
952 }
953 updateMeasurementStatistics(
954 workerUsage.runTime,
955 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
956 message.taskPerformance?.runTime ?? 0
957 )
958 }
959
960 private updateWaitTimeWorkerUsage (
961 workerUsage: WorkerUsage,
962 task: Task<Data>
963 ): void {
964 const timestamp = performance.now()
965 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
966 updateMeasurementStatistics(
967 workerUsage.waitTime,
968 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
969 taskWaitTime
970 )
971 }
972
973 private updateEluWorkerUsage (
974 workerUsage: WorkerUsage,
975 message: MessageValue<Response>
976 ): void {
977 if (message.taskError != null) {
978 return
979 }
980 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
981 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
982 updateMeasurementStatistics(
983 workerUsage.elu.active,
984 eluTaskStatisticsRequirements,
985 message.taskPerformance?.elu?.active ?? 0
986 )
987 updateMeasurementStatistics(
988 workerUsage.elu.idle,
989 eluTaskStatisticsRequirements,
990 message.taskPerformance?.elu?.idle ?? 0
991 )
992 if (eluTaskStatisticsRequirements.aggregate) {
993 if (message.taskPerformance?.elu != null) {
994 if (workerUsage.elu.utilization != null) {
995 workerUsage.elu.utilization =
996 (workerUsage.elu.utilization +
997 message.taskPerformance.elu.utilization) /
998 2
999 } else {
1000 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1001 }
1002 }
1003 }
1004 }
1005
1006 /**
1007 * Chooses a worker node for the next task.
1008 *
1009 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1010 *
1011 * @returns The chosen worker node key
1012 */
1013 private chooseWorkerNode (): number {
1014 if (this.shallCreateDynamicWorker()) {
1015 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1016 if (
1017 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1018 ) {
1019 return workerNodeKey
1020 }
1021 }
1022 return this.workerChoiceStrategyContext.execute()
1023 }
1024
1025 /**
1026 * Conditions for dynamic worker creation.
1027 *
1028 * @returns Whether to create a dynamic worker or not.
1029 */
1030 private shallCreateDynamicWorker (): boolean {
1031 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1032 }
1033
1034 /**
1035 * Sends a message to worker given its worker node key.
1036 *
1037 * @param workerNodeKey - The worker node key.
1038 * @param message - The message.
1039 * @param transferList - The optional array of transferable objects.
1040 */
1041 protected abstract sendToWorker (
1042 workerNodeKey: number,
1043 message: MessageValue<Data>,
1044 transferList?: TransferListItem[]
1045 ): void
1046
1047 /**
1048 * Creates a new worker.
1049 *
1050 * @returns Newly created worker.
1051 */
1052 protected abstract createWorker (): Worker
1053
1054 /**
1055 * Creates a new, completely set up worker node.
1056 *
1057 * @returns New, completely set up worker node key.
1058 */
1059 protected createAndSetupWorkerNode (): number {
1060 const worker = this.createWorker()
1061
1062 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1063 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1064 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1065 worker.on('error', (error) => {
1066 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1067 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1068 workerInfo.ready = false
1069 this.workerNodes[workerNodeKey].closeChannel()
1070 this.emitter?.emit(PoolEvents.error, error)
1071 if (
1072 this.opts.restartWorkerOnError === true &&
1073 !this.starting &&
1074 this.started
1075 ) {
1076 if (workerInfo.dynamic) {
1077 this.createAndSetupDynamicWorkerNode()
1078 } else {
1079 this.createAndSetupWorkerNode()
1080 }
1081 }
1082 if (this.opts.enableTasksQueue === true) {
1083 this.redistributeQueuedTasks(workerNodeKey)
1084 }
1085 })
1086 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1087 worker.once('exit', () => {
1088 this.removeWorkerNode(worker)
1089 })
1090
1091 const workerNodeKey = this.addWorkerNode(worker)
1092
1093 this.afterWorkerNodeSetup(workerNodeKey)
1094
1095 return workerNodeKey
1096 }
1097
1098 /**
1099 * Creates a new, completely set up dynamic worker node.
1100 *
1101 * @returns New, completely set up dynamic worker node key.
1102 */
1103 protected createAndSetupDynamicWorkerNode (): number {
1104 const workerNodeKey = this.createAndSetupWorkerNode()
1105 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1106 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1107 message.workerId
1108 )
1109 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1110 // Kill message received from worker
1111 if (
1112 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1113 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1114 ((this.opts.enableTasksQueue === false &&
1115 workerUsage.tasks.executing === 0) ||
1116 (this.opts.enableTasksQueue === true &&
1117 workerUsage.tasks.executing === 0 &&
1118 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1119 ) {
1120 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1121 this.emitter?.emit(PoolEvents.error, error)
1122 })
1123 }
1124 })
1125 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1126 this.sendToWorker(workerNodeKey, {
1127 checkActive: true,
1128 workerId: workerInfo.id as number
1129 })
1130 workerInfo.dynamic = true
1131 if (
1132 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1133 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1134 ) {
1135 workerInfo.ready = true
1136 }
1137 this.checkAndEmitDynamicWorkerCreationEvents()
1138 return workerNodeKey
1139 }
1140
1141 /**
1142 * Registers a listener callback on the worker given its worker node key.
1143 *
1144 * @param workerNodeKey - The worker node key.
1145 * @param listener - The message listener callback.
1146 */
1147 protected abstract registerWorkerMessageListener<
1148 Message extends Data | Response
1149 >(
1150 workerNodeKey: number,
1151 listener: (message: MessageValue<Message>) => void
1152 ): void
1153
1154 /**
1155 * Method hooked up after a worker node has been newly created.
1156 * Can be overridden.
1157 *
1158 * @param workerNodeKey - The newly created worker node key.
1159 */
1160 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1161 // Listen to worker messages.
1162 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1163 // Send the startup message to worker.
1164 this.sendStartupMessageToWorker(workerNodeKey)
1165 // Send the statistics message to worker.
1166 this.sendStatisticsMessageToWorker(workerNodeKey)
1167 if (this.opts.enableTasksQueue === true) {
1168 this.workerNodes[workerNodeKey].onEmptyQueue =
1169 this.taskStealingOnEmptyQueue.bind(this)
1170 this.workerNodes[workerNodeKey].onBackPressure =
1171 this.tasksStealingOnBackPressure.bind(this)
1172 }
1173 }
1174
1175 /**
1176 * Sends the startup message to worker given its worker node key.
1177 *
1178 * @param workerNodeKey - The worker node key.
1179 */
1180 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1181
1182 /**
1183 * Sends the statistics message to worker given its worker node key.
1184 *
1185 * @param workerNodeKey - The worker node key.
1186 */
1187 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1188 this.sendToWorker(workerNodeKey, {
1189 statistics: {
1190 runTime:
1191 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1192 .runTime.aggregate,
1193 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1194 .elu.aggregate
1195 },
1196 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1197 })
1198 }
1199
1200 private redistributeQueuedTasks (workerNodeKey: number): void {
1201 while (this.tasksQueueSize(workerNodeKey) > 0) {
1202 let destinationWorkerNodeKey!: number
1203 let minQueuedTasks = Infinity
1204 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1205 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1206 if (workerNode.usage.tasks.queued === 0) {
1207 destinationWorkerNodeKey = workerNodeId
1208 break
1209 }
1210 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1211 minQueuedTasks = workerNode.usage.tasks.queued
1212 destinationWorkerNodeKey = workerNodeId
1213 }
1214 }
1215 }
1216 if (destinationWorkerNodeKey != null) {
1217 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1218 const task = {
1219 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1220 workerId: destinationWorkerNode.info.id as number
1221 }
1222 if (
1223 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1224 destinationWorkerNode.usage.tasks.executing <
1225 (this.opts.tasksQueueOptions?.concurrency as number)
1226 ) {
1227 this.executeTask(destinationWorkerNodeKey, task)
1228 } else {
1229 this.enqueueTask(destinationWorkerNodeKey, task)
1230 }
1231 }
1232 }
1233 }
1234
1235 private taskStealingOnEmptyQueue (workerId: number): void {
1236 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1237 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1238 const workerNodes = this.workerNodes
1239 .slice()
1240 .sort(
1241 (workerNodeA, workerNodeB) =>
1242 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1243 )
1244 for (const sourceWorkerNode of workerNodes) {
1245 if (sourceWorkerNode.usage.tasks.queued === 0) {
1246 break
1247 }
1248 if (
1249 sourceWorkerNode.info.ready &&
1250 sourceWorkerNode.info.id !== workerId &&
1251 sourceWorkerNode.usage.tasks.queued > 0
1252 ) {
1253 const task = {
1254 ...(sourceWorkerNode.popTask() as Task<Data>),
1255 workerId: destinationWorkerNode.info.id as number
1256 }
1257 if (
1258 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1259 destinationWorkerNode.usage.tasks.executing <
1260 (this.opts.tasksQueueOptions?.concurrency as number)
1261 ) {
1262 this.executeTask(destinationWorkerNodeKey, task)
1263 } else {
1264 this.enqueueTask(destinationWorkerNodeKey, task)
1265 }
1266 if (destinationWorkerNode?.usage != null) {
1267 ++destinationWorkerNode.usage.tasks.stolen
1268 }
1269 if (
1270 this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
1271 destinationWorkerNode.getTaskFunctionWorkerUsage(
1272 task.name as string
1273 ) != null
1274 ) {
1275 const taskFunctionWorkerUsage =
1276 destinationWorkerNode.getTaskFunctionWorkerUsage(
1277 task.name as string
1278 ) as WorkerUsage
1279 ++taskFunctionWorkerUsage.tasks.stolen
1280 }
1281 break
1282 }
1283 }
1284 }
1285
1286 private tasksStealingOnBackPressure (workerId: number): void {
1287 if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
1288 return
1289 }
1290 const sourceWorkerNode =
1291 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1292 const workerNodes = this.workerNodes
1293 .slice()
1294 .sort(
1295 (workerNodeA, workerNodeB) =>
1296 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1297 )
1298 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1299 if (
1300 sourceWorkerNode.usage.tasks.queued > 0 &&
1301 workerNode.info.ready &&
1302 workerNode.info.id !== workerId &&
1303 workerNode.usage.tasks.queued <
1304 (this.opts.tasksQueueOptions?.size as number) - 1
1305 ) {
1306 const task = {
1307 ...(sourceWorkerNode.popTask() as Task<Data>),
1308 workerId: workerNode.info.id as number
1309 }
1310 if (
1311 this.tasksQueueSize(workerNodeKey) === 0 &&
1312 workerNode.usage.tasks.executing <
1313 (this.opts.tasksQueueOptions?.concurrency as number)
1314 ) {
1315 this.executeTask(workerNodeKey, task)
1316 } else {
1317 this.enqueueTask(workerNodeKey, task)
1318 }
1319 if (workerNode?.usage != null) {
1320 ++workerNode.usage.tasks.stolen
1321 }
1322 if (
1323 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1324 workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
1325 ) {
1326 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1327 task.name as string
1328 ) as WorkerUsage
1329 ++taskFunctionWorkerUsage.tasks.stolen
1330 }
1331 }
1332 }
1333 }
1334
1335 /**
1336 * This method is the listener registered for each worker message.
1337 *
1338 * @returns The listener function to execute when a message is received from a worker.
1339 */
1340 protected workerListener (): (message: MessageValue<Response>) => void {
1341 return (message) => {
1342 this.checkMessageWorkerId(message)
1343 if (message.ready != null && message.taskFunctions != null) {
1344 // Worker ready response received from worker
1345 this.handleWorkerReadyResponse(message)
1346 } else if (message.taskId != null) {
1347 // Task execution response received from worker
1348 this.handleTaskExecutionResponse(message)
1349 } else if (message.taskFunctions != null) {
1350 // Task functions message received from worker
1351 (
1352 this.getWorkerInfo(
1353 this.getWorkerNodeKeyByWorkerId(message.workerId)
1354 ) as WorkerInfo
1355 ).taskFunctions = message.taskFunctions
1356 }
1357 }
1358 }
1359
1360 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1361 if (message.ready === false) {
1362 throw new Error(`Worker ${message.workerId} failed to initialize`)
1363 }
1364 const workerInfo = this.getWorkerInfo(
1365 this.getWorkerNodeKeyByWorkerId(message.workerId)
1366 ) as WorkerInfo
1367 workerInfo.ready = message.ready as boolean
1368 workerInfo.taskFunctions = message.taskFunctions
1369 if (this.emitter != null && this.ready) {
1370 this.emitter.emit(PoolEvents.ready, this.info)
1371 }
1372 }
1373
1374 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1375 const { taskId, taskError, data } = message
1376 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1377 if (promiseResponse != null) {
1378 if (taskError != null) {
1379 this.emitter?.emit(PoolEvents.taskError, taskError)
1380 promiseResponse.reject(taskError.message)
1381 } else {
1382 promiseResponse.resolve(data as Response)
1383 }
1384 const workerNodeKey = promiseResponse.workerNodeKey
1385 this.afterTaskExecutionHook(workerNodeKey, message)
1386 this.promiseResponseMap.delete(taskId as string)
1387 if (
1388 this.opts.enableTasksQueue === true &&
1389 this.tasksQueueSize(workerNodeKey) > 0 &&
1390 this.workerNodes[workerNodeKey].usage.tasks.executing <
1391 (this.opts.tasksQueueOptions?.concurrency as number)
1392 ) {
1393 this.executeTask(
1394 workerNodeKey,
1395 this.dequeueTask(workerNodeKey) as Task<Data>
1396 )
1397 }
1398 this.workerChoiceStrategyContext.update(workerNodeKey)
1399 }
1400 }
1401
1402 private checkAndEmitTaskExecutionEvents (): void {
1403 if (this.busy) {
1404 this.emitter?.emit(PoolEvents.busy, this.info)
1405 }
1406 }
1407
1408 private checkAndEmitTaskQueuingEvents (): void {
1409 if (this.hasBackPressure()) {
1410 this.emitter?.emit(PoolEvents.backPressure, this.info)
1411 }
1412 }
1413
1414 private checkAndEmitDynamicWorkerCreationEvents (): void {
1415 if (this.type === PoolTypes.dynamic) {
1416 if (this.full) {
1417 this.emitter?.emit(PoolEvents.full, this.info)
1418 }
1419 }
1420 }
1421
1422 /**
1423 * Gets the worker information given its worker node key.
1424 *
1425 * @param workerNodeKey - The worker node key.
1426 * @returns The worker information.
1427 */
1428 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1429 return this.workerNodes[workerNodeKey]?.info
1430 }
1431
1432 /**
1433 * Adds the given worker in the pool worker nodes.
1434 *
1435 * @param worker - The worker.
1436 * @returns The added worker node key.
1437 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1438 */
1439 private addWorkerNode (worker: Worker): number {
1440 const workerNode = new WorkerNode<Worker, Data>(
1441 worker,
1442 this.worker,
1443 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1444 )
1445 // Flag the worker node as ready at pool startup.
1446 if (this.starting) {
1447 workerNode.info.ready = true
1448 }
1449 this.workerNodes.push(workerNode)
1450 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1451 if (workerNodeKey === -1) {
1452 throw new Error('Worker node added not found')
1453 }
1454 return workerNodeKey
1455 }
1456
1457 /**
1458 * Removes the given worker from the pool worker nodes.
1459 *
1460 * @param worker - The worker.
1461 */
1462 private removeWorkerNode (worker: Worker): void {
1463 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1464 if (workerNodeKey !== -1) {
1465 this.workerNodes.splice(workerNodeKey, 1)
1466 this.workerChoiceStrategyContext.remove(workerNodeKey)
1467 }
1468 }
1469
1470 /** @inheritDoc */
1471 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1472 return (
1473 this.opts.enableTasksQueue === true &&
1474 this.workerNodes[workerNodeKey].hasBackPressure()
1475 )
1476 }
1477
1478 private hasBackPressure (): boolean {
1479 return (
1480 this.opts.enableTasksQueue === true &&
1481 this.workerNodes.findIndex(
1482 (workerNode) => !workerNode.hasBackPressure()
1483 ) === -1
1484 )
1485 }
1486
1487 /**
1488 * Executes the given task on the worker given its worker node key.
1489 *
1490 * @param workerNodeKey - The worker node key.
1491 * @param task - The task to execute.
1492 */
1493 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1494 this.beforeTaskExecutionHook(workerNodeKey, task)
1495 this.sendToWorker(workerNodeKey, task, task.transferList)
1496 this.checkAndEmitTaskExecutionEvents()
1497 }
1498
1499 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1500 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1501 this.checkAndEmitTaskQueuingEvents()
1502 return tasksQueueSize
1503 }
1504
1505 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1506 return this.workerNodes[workerNodeKey].dequeueTask()
1507 }
1508
1509 private tasksQueueSize (workerNodeKey: number): number {
1510 return this.workerNodes[workerNodeKey].tasksQueueSize()
1511 }
1512
1513 protected flushTasksQueue (workerNodeKey: number): void {
1514 while (this.tasksQueueSize(workerNodeKey) > 0) {
1515 this.executeTask(
1516 workerNodeKey,
1517 this.dequeueTask(workerNodeKey) as Task<Data>
1518 )
1519 }
1520 this.workerNodes[workerNodeKey].clearTasksQueue()
1521 }
1522
1523 private flushTasksQueues (): void {
1524 for (const [workerNodeKey] of this.workerNodes.entries()) {
1525 this.flushTasksQueue(workerNodeKey)
1526 }
1527 }
1528 }