feat: continuous task stealing
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task,
9 Writable
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
15 average,
16 isKillBehavior,
17 isPlainObject,
18 median,
19 round,
20 updateMeasurementStatistics
21 } from '../utils'
22 import { KillBehaviors } from '../worker/worker-options'
23 import {
24 type IPool,
25 PoolEmitter,
26 PoolEvents,
27 type PoolInfo,
28 type PoolOptions,
29 type PoolType,
30 PoolTypes,
31 type TasksQueueOptions
32 } from './pool'
33 import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
39 } from './worker'
40 import {
41 type MeasurementStatisticsRequirements,
42 Measurements,
43 WorkerChoiceStrategies,
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
46 } from './selection-strategies/selection-strategies-types'
47 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
48 import { version } from './version'
49 import { WorkerNode } from './worker-node'
50
51 /**
52 * Base class that implements some shared logic for all poolifier pools.
53 *
54 * @typeParam Worker - Type of worker which manages this pool.
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
57 */
58 export abstract class AbstractPool<
59 Worker extends IWorker,
60 Data = unknown,
61 Response = unknown
62 > implements IPool<Worker, Data, Response> {
63 /** @inheritDoc */
64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
65
66 /** @inheritDoc */
67 public readonly emitter?: PoolEmitter
68
69 /**
70 * The task execution response promise map.
71 *
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is starting or not.
96 */
97 private readonly starting: boolean
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.starting = true
148 this.startPool()
149 this.starting = false
150 this.started = true
151
152 this.startTimestamp = performance.now()
153 }
154
155 private checkFilePath (filePath: string): void {
156 if (
157 filePath == null ||
158 typeof filePath !== 'string' ||
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
161 throw new Error('Please specify a file with a worker implementation')
162 }
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
166 }
167
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
174 throw new TypeError(
175 'Cannot instantiate a pool with a non safe integer number of workers'
176 )
177 } else if (numberOfWorkers < 0) {
178 throw new RangeError(
179 'Cannot instantiate a pool with a negative number of workers'
180 )
181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
187 if (this.type === PoolTypes.dynamic) {
188 if (max == null) {
189 throw new TypeError(
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
200 } else if (max === 0) {
201 throw new RangeError(
202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
209 }
210 }
211
212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
237 }
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
244 throw new Error(
245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
246 )
247 }
248 }
249
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
274 if (
275 workerChoiceStrategyOptions.weights != null &&
276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
292 }
293
294 private checkValidTasksQueueOptions (
295 tasksQueueOptions: Writable<TasksQueueOptions>
296 ): void {
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
305 'Invalid worker node tasks concurrency: must be an integer'
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
312 throw new RangeError(
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
318 tasksQueueOptions?.size != null
319 ) {
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
322 )
323 }
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
327 if (
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
330 ) {
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
336 throw new RangeError(
337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
338 )
339 }
340 }
341
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
350 this.createAndSetupWorkerNode()
351 }
352 }
353
354 /** @inheritDoc */
355 public get info (): PoolInfo {
356 return {
357 version,
358 type: this.type,
359 worker: this.worker,
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
362 minSize: this.minSize,
363 maxSize: this.maxSize,
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 workerNode.usage.tasks.executing === 0
372 ? accumulator + 1
373 : accumulator,
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
379 0
380 ),
381 executedTasks: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + workerNode.usage.tasks.executed,
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.executing,
389 0
390 ),
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
408 ...(this.opts.enableTasksQueue === true && {
409 stolenTasks: this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + workerNode.usage.tasks.stolen,
412 0
413 )
414 }),
415 failedTasks: this.workerNodes.reduce(
416 (accumulator, workerNode) =>
417 accumulator + workerNode.usage.tasks.failed,
418 0
419 ),
420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
421 .runTime.aggregate && {
422 runTime: {
423 minimum: round(
424 Math.min(
425 ...this.workerNodes.map(
426 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
427 )
428 )
429 ),
430 maximum: round(
431 Math.max(
432 ...this.workerNodes.map(
433 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
434 )
435 )
436 ),
437 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
438 .runTime.average && {
439 average: round(
440 average(
441 this.workerNodes.reduce<number[]>(
442 (accumulator, workerNode) =>
443 accumulator.concat(workerNode.usage.runTime.history),
444 []
445 )
446 )
447 )
448 }),
449 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
450 .runTime.median && {
451 median: round(
452 median(
453 this.workerNodes.reduce<number[]>(
454 (accumulator, workerNode) =>
455 accumulator.concat(workerNode.usage.runTime.history),
456 []
457 )
458 )
459 )
460 })
461 }
462 }),
463 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
464 .waitTime.aggregate && {
465 waitTime: {
466 minimum: round(
467 Math.min(
468 ...this.workerNodes.map(
469 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
470 )
471 )
472 ),
473 maximum: round(
474 Math.max(
475 ...this.workerNodes.map(
476 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
477 )
478 )
479 ),
480 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 .waitTime.average && {
482 average: round(
483 average(
484 this.workerNodes.reduce<number[]>(
485 (accumulator, workerNode) =>
486 accumulator.concat(workerNode.usage.waitTime.history),
487 []
488 )
489 )
490 )
491 }),
492 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
493 .waitTime.median && {
494 median: round(
495 median(
496 this.workerNodes.reduce<number[]>(
497 (accumulator, workerNode) =>
498 accumulator.concat(workerNode.usage.waitTime.history),
499 []
500 )
501 )
502 )
503 })
504 }
505 })
506 }
507 }
508
509 /**
510 * The pool readiness boolean status.
511 */
512 private get ready (): boolean {
513 return (
514 this.workerNodes.reduce(
515 (accumulator, workerNode) =>
516 !workerNode.info.dynamic && workerNode.info.ready
517 ? accumulator + 1
518 : accumulator,
519 0
520 ) >= this.minSize
521 )
522 }
523
524 /**
525 * The approximate pool utilization.
526 *
527 * @returns The pool utilization.
528 */
529 private get utilization (): number {
530 const poolTimeCapacity =
531 (performance.now() - this.startTimestamp) * this.maxSize
532 const totalTasksRunTime = this.workerNodes.reduce(
533 (accumulator, workerNode) =>
534 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
535 0
536 )
537 const totalTasksWaitTime = this.workerNodes.reduce(
538 (accumulator, workerNode) =>
539 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
540 0
541 )
542 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
543 }
544
545 /**
546 * The pool type.
547 *
548 * If it is `'dynamic'`, it provides the `max` property.
549 */
550 protected abstract get type (): PoolType
551
552 /**
553 * The worker type.
554 */
555 protected abstract get worker (): WorkerType
556
557 /**
558 * The pool minimum size.
559 */
560 protected get minSize (): number {
561 return this.numberOfWorkers
562 }
563
564 /**
565 * The pool maximum size.
566 */
567 protected get maxSize (): number {
568 return this.max ?? this.numberOfWorkers
569 }
570
571 /**
572 * Checks if the worker id sent in the received message from a worker is valid.
573 *
574 * @param message - The received message.
575 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
576 */
577 private checkMessageWorkerId (message: MessageValue<Response>): void {
578 if (message.workerId == null) {
579 throw new Error('Worker message received without worker id')
580 } else if (
581 message.workerId != null &&
582 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
583 ) {
584 throw new Error(
585 `Worker message received from unknown worker '${message.workerId}'`
586 )
587 }
588 }
589
590 /**
591 * Gets the given worker its worker node key.
592 *
593 * @param worker - The worker.
594 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
595 */
596 private getWorkerNodeKeyByWorker (worker: Worker): number {
597 return this.workerNodes.findIndex(
598 (workerNode) => workerNode.worker === worker
599 )
600 }
601
602 /**
603 * Gets the worker node key given its worker id.
604 *
605 * @param workerId - The worker id.
606 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
607 */
608 private getWorkerNodeKeyByWorkerId (workerId: number): number {
609 return this.workerNodes.findIndex(
610 (workerNode) => workerNode.info.id === workerId
611 )
612 }
613
614 /** @inheritDoc */
615 public setWorkerChoiceStrategy (
616 workerChoiceStrategy: WorkerChoiceStrategy,
617 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
618 ): void {
619 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
620 this.opts.workerChoiceStrategy = workerChoiceStrategy
621 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
622 this.opts.workerChoiceStrategy
623 )
624 if (workerChoiceStrategyOptions != null) {
625 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
626 }
627 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
628 workerNode.resetUsage()
629 this.sendStatisticsMessageToWorker(workerNodeKey)
630 }
631 }
632
633 /** @inheritDoc */
634 public setWorkerChoiceStrategyOptions (
635 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
636 ): void {
637 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
638 this.opts.workerChoiceStrategyOptions = {
639 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
640 ...workerChoiceStrategyOptions
641 }
642 this.workerChoiceStrategyContext.setOptions(
643 this.opts.workerChoiceStrategyOptions
644 )
645 }
646
647 /** @inheritDoc */
648 public enableTasksQueue (
649 enable: boolean,
650 tasksQueueOptions?: TasksQueueOptions
651 ): void {
652 if (this.opts.enableTasksQueue === true && !enable) {
653 this.flushTasksQueues()
654 }
655 this.opts.enableTasksQueue = enable
656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
657 }
658
659 /** @inheritDoc */
660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
661 if (this.opts.enableTasksQueue === true) {
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
665 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
666 } else if (this.opts.tasksQueueOptions != null) {
667 delete this.opts.tasksQueueOptions
668 }
669 }
670
671 private setTasksQueueMaxSize (size: number): void {
672 for (const workerNode of this.workerNodes) {
673 workerNode.tasksQueueBackPressureSize = size
674 }
675 }
676
677 private buildTasksQueueOptions (
678 tasksQueueOptions: TasksQueueOptions
679 ): TasksQueueOptions {
680 return {
681 ...{
682 size: Math.pow(this.maxSize, 2),
683 concurrency: 1
684 },
685 ...tasksQueueOptions
686 }
687 }
688
689 /**
690 * Whether the pool is full or not.
691 *
692 * The pool filling boolean status.
693 */
694 protected get full (): boolean {
695 return this.workerNodes.length >= this.maxSize
696 }
697
698 /**
699 * Whether the pool is busy or not.
700 *
701 * The pool busyness boolean status.
702 */
703 protected abstract get busy (): boolean
704
705 /**
706 * Whether worker nodes are executing concurrently their tasks quota or not.
707 *
708 * @returns Worker nodes busyness boolean status.
709 */
710 protected internalBusy (): boolean {
711 if (this.opts.enableTasksQueue === true) {
712 return (
713 this.workerNodes.findIndex(
714 (workerNode) =>
715 workerNode.info.ready &&
716 workerNode.usage.tasks.executing <
717 (this.opts.tasksQueueOptions?.concurrency as number)
718 ) === -1
719 )
720 } else {
721 return (
722 this.workerNodes.findIndex(
723 (workerNode) =>
724 workerNode.info.ready && workerNode.usage.tasks.executing === 0
725 ) === -1
726 )
727 }
728 }
729
730 /** @inheritDoc */
731 public listTaskFunctions (): string[] {
732 for (const workerNode of this.workerNodes) {
733 if (
734 Array.isArray(workerNode.info.taskFunctions) &&
735 workerNode.info.taskFunctions.length > 0
736 ) {
737 return workerNode.info.taskFunctions
738 }
739 }
740 return []
741 }
742
743 /** @inheritDoc */
744 public async execute (
745 data?: Data,
746 name?: string,
747 transferList?: TransferListItem[]
748 ): Promise<Response> {
749 return await new Promise<Response>((resolve, reject) => {
750 if (!this.started) {
751 reject(new Error('Cannot execute a task on destroyed pool'))
752 }
753 if (name != null && typeof name !== 'string') {
754 reject(new TypeError('name argument must be a string'))
755 }
756 if (
757 name != null &&
758 typeof name === 'string' &&
759 name.trim().length === 0
760 ) {
761 reject(new TypeError('name argument must not be an empty string'))
762 }
763 if (transferList != null && !Array.isArray(transferList)) {
764 reject(new TypeError('transferList argument must be an array'))
765 }
766 const timestamp = performance.now()
767 const workerNodeKey = this.chooseWorkerNode()
768 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
769 if (
770 name != null &&
771 Array.isArray(workerInfo.taskFunctions) &&
772 !workerInfo.taskFunctions.includes(name)
773 ) {
774 reject(
775 new Error(`Task function '${name}' is not registered in the pool`)
776 )
777 }
778 const task: Task<Data> = {
779 name: name ?? DEFAULT_TASK_NAME,
780 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
781 data: data ?? ({} as Data),
782 transferList,
783 timestamp,
784 workerId: workerInfo.id as number,
785 taskId: randomUUID()
786 }
787 this.promiseResponseMap.set(task.taskId as string, {
788 resolve,
789 reject,
790 workerNodeKey
791 })
792 if (
793 this.opts.enableTasksQueue === false ||
794 (this.opts.enableTasksQueue === true &&
795 this.tasksQueueSize(workerNodeKey) === 0 &&
796 this.workerNodes[workerNodeKey].usage.tasks.executing <
797 (this.opts.tasksQueueOptions?.concurrency as number))
798 ) {
799 this.executeTask(workerNodeKey, task)
800 } else {
801 this.enqueueTask(workerNodeKey, task)
802 }
803 })
804 }
805
806 /** @inheritDoc */
807 public async destroy (): Promise<void> {
808 await Promise.all(
809 this.workerNodes.map(async (_, workerNodeKey) => {
810 await this.destroyWorkerNode(workerNodeKey)
811 })
812 )
813 this.emitter?.emit(PoolEvents.destroy, this.info)
814 this.started = false
815 }
816
817 protected async sendKillMessageToWorker (
818 workerNodeKey: number,
819 workerId: number
820 ): Promise<void> {
821 await new Promise<void>((resolve, reject) => {
822 this.registerWorkerMessageListener(workerNodeKey, (message) => {
823 if (message.kill === 'success') {
824 resolve()
825 } else if (message.kill === 'failure') {
826 reject(new Error(`Worker ${workerId} kill message handling failed`))
827 }
828 })
829 this.sendToWorker(workerNodeKey, { kill: true, workerId })
830 })
831 }
832
833 /**
834 * Terminates the worker node given its worker node key.
835 *
836 * @param workerNodeKey - The worker node key.
837 */
838 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
839
840 /**
841 * Setup hook to execute code before worker nodes are created in the abstract constructor.
842 * Can be overridden.
843 *
844 * @virtual
845 */
846 protected setupHook (): void {
847 /** Intentionally empty */
848 }
849
850 /**
851 * Should return whether the worker is the main worker or not.
852 */
853 protected abstract isMain (): boolean
854
855 /**
856 * Hook executed before the worker task execution.
857 * Can be overridden.
858 *
859 * @param workerNodeKey - The worker node key.
860 * @param task - The task to execute.
861 */
862 protected beforeTaskExecutionHook (
863 workerNodeKey: number,
864 task: Task<Data>
865 ): void {
866 if (this.workerNodes[workerNodeKey]?.usage != null) {
867 const workerUsage = this.workerNodes[workerNodeKey].usage
868 ++workerUsage.tasks.executing
869 this.updateWaitTimeWorkerUsage(workerUsage, task)
870 }
871 if (
872 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
873 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
874 task.name as string
875 ) != null
876 ) {
877 const taskFunctionWorkerUsage = this.workerNodes[
878 workerNodeKey
879 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
880 ++taskFunctionWorkerUsage.tasks.executing
881 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
882 }
883 }
884
885 /**
886 * Hook executed after the worker task execution.
887 * Can be overridden.
888 *
889 * @param workerNodeKey - The worker node key.
890 * @param message - The received message.
891 */
892 protected afterTaskExecutionHook (
893 workerNodeKey: number,
894 message: MessageValue<Response>
895 ): void {
896 if (this.workerNodes[workerNodeKey]?.usage != null) {
897 const workerUsage = this.workerNodes[workerNodeKey].usage
898 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
899 this.updateRunTimeWorkerUsage(workerUsage, message)
900 this.updateEluWorkerUsage(workerUsage, message)
901 }
902 if (
903 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
904 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
905 message.taskPerformance?.name as string
906 ) != null
907 ) {
908 const taskFunctionWorkerUsage = this.workerNodes[
909 workerNodeKey
910 ].getTaskFunctionWorkerUsage(
911 message.taskPerformance?.name as string
912 ) as WorkerUsage
913 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
914 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
915 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
916 }
917 }
918
919 /**
920 * Whether the worker node shall update its task function worker usage or not.
921 *
922 * @param workerNodeKey - The worker node key.
923 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
924 */
925 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
926 const workerInfo = this.getWorkerInfo(workerNodeKey)
927 return (
928 workerInfo != null &&
929 Array.isArray(workerInfo.taskFunctions) &&
930 workerInfo.taskFunctions.length > 2
931 )
932 }
933
934 private updateTaskStatisticsWorkerUsage (
935 workerUsage: WorkerUsage,
936 message: MessageValue<Response>
937 ): void {
938 const workerTaskStatistics = workerUsage.tasks
939 if (
940 workerTaskStatistics.executing != null &&
941 workerTaskStatistics.executing > 0
942 ) {
943 --workerTaskStatistics.executing
944 }
945 if (message.taskError == null) {
946 ++workerTaskStatistics.executed
947 } else {
948 ++workerTaskStatistics.failed
949 }
950 }
951
952 private updateRunTimeWorkerUsage (
953 workerUsage: WorkerUsage,
954 message: MessageValue<Response>
955 ): void {
956 if (message.taskError != null) {
957 return
958 }
959 updateMeasurementStatistics(
960 workerUsage.runTime,
961 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
962 message.taskPerformance?.runTime ?? 0
963 )
964 }
965
966 private updateWaitTimeWorkerUsage (
967 workerUsage: WorkerUsage,
968 task: Task<Data>
969 ): void {
970 const timestamp = performance.now()
971 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
972 updateMeasurementStatistics(
973 workerUsage.waitTime,
974 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
975 taskWaitTime
976 )
977 }
978
979 private updateEluWorkerUsage (
980 workerUsage: WorkerUsage,
981 message: MessageValue<Response>
982 ): void {
983 if (message.taskError != null) {
984 return
985 }
986 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
987 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
988 updateMeasurementStatistics(
989 workerUsage.elu.active,
990 eluTaskStatisticsRequirements,
991 message.taskPerformance?.elu?.active ?? 0
992 )
993 updateMeasurementStatistics(
994 workerUsage.elu.idle,
995 eluTaskStatisticsRequirements,
996 message.taskPerformance?.elu?.idle ?? 0
997 )
998 if (eluTaskStatisticsRequirements.aggregate) {
999 if (message.taskPerformance?.elu != null) {
1000 if (workerUsage.elu.utilization != null) {
1001 workerUsage.elu.utilization =
1002 (workerUsage.elu.utilization +
1003 message.taskPerformance.elu.utilization) /
1004 2
1005 } else {
1006 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1007 }
1008 }
1009 }
1010 }
1011
1012 /**
1013 * Chooses a worker node for the next task.
1014 *
1015 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1016 *
1017 * @returns The chosen worker node key
1018 */
1019 private chooseWorkerNode (): number {
1020 if (this.shallCreateDynamicWorker()) {
1021 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1022 if (
1023 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1024 ) {
1025 return workerNodeKey
1026 }
1027 }
1028 return this.workerChoiceStrategyContext.execute()
1029 }
1030
1031 /**
1032 * Conditions for dynamic worker creation.
1033 *
1034 * @returns Whether to create a dynamic worker or not.
1035 */
1036 private shallCreateDynamicWorker (): boolean {
1037 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1038 }
1039
1040 /**
1041 * Sends a message to worker given its worker node key.
1042 *
1043 * @param workerNodeKey - The worker node key.
1044 * @param message - The message.
1045 * @param transferList - The optional array of transferable objects.
1046 */
1047 protected abstract sendToWorker (
1048 workerNodeKey: number,
1049 message: MessageValue<Data>,
1050 transferList?: TransferListItem[]
1051 ): void
1052
1053 /**
1054 * Creates a new worker.
1055 *
1056 * @returns Newly created worker.
1057 */
1058 protected abstract createWorker (): Worker
1059
1060 /**
1061 * Creates a new, completely set up worker node.
1062 *
1063 * @returns New, completely set up worker node key.
1064 */
1065 protected createAndSetupWorkerNode (): number {
1066 const worker = this.createWorker()
1067
1068 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1069 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1070 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1071 worker.on('error', (error) => {
1072 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1073 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1074 workerInfo.ready = false
1075 this.workerNodes[workerNodeKey].closeChannel()
1076 this.emitter?.emit(PoolEvents.error, error)
1077 if (
1078 this.opts.restartWorkerOnError === true &&
1079 !this.starting &&
1080 this.started
1081 ) {
1082 if (workerInfo.dynamic) {
1083 this.createAndSetupDynamicWorkerNode()
1084 } else {
1085 this.createAndSetupWorkerNode()
1086 }
1087 }
1088 if (this.opts.enableTasksQueue === true) {
1089 this.redistributeQueuedTasks(workerNodeKey)
1090 }
1091 })
1092 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1093 worker.once('exit', () => {
1094 this.removeWorkerNode(worker)
1095 })
1096
1097 const workerNodeKey = this.addWorkerNode(worker)
1098
1099 this.afterWorkerNodeSetup(workerNodeKey)
1100
1101 return workerNodeKey
1102 }
1103
1104 /**
1105 * Creates a new, completely set up dynamic worker node.
1106 *
1107 * @returns New, completely set up dynamic worker node key.
1108 */
1109 protected createAndSetupDynamicWorkerNode (): number {
1110 const workerNodeKey = this.createAndSetupWorkerNode()
1111 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1112 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1113 message.workerId
1114 )
1115 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1116 // Kill message received from worker
1117 if (
1118 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1119 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1120 ((this.opts.enableTasksQueue === false &&
1121 workerUsage.tasks.executing === 0) ||
1122 (this.opts.enableTasksQueue === true &&
1123 workerUsage.tasks.executing === 0 &&
1124 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1125 ) {
1126 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1127 this.emitter?.emit(PoolEvents.error, error)
1128 })
1129 }
1130 })
1131 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1132 this.sendToWorker(workerNodeKey, {
1133 checkActive: true,
1134 workerId: workerInfo.id as number
1135 })
1136 workerInfo.dynamic = true
1137 if (
1138 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1139 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1140 ) {
1141 workerInfo.ready = true
1142 }
1143 this.checkAndEmitDynamicWorkerCreationEvents()
1144 return workerNodeKey
1145 }
1146
1147 /**
1148 * Registers a listener callback on the worker given its worker node key.
1149 *
1150 * @param workerNodeKey - The worker node key.
1151 * @param listener - The message listener callback.
1152 */
1153 protected abstract registerWorkerMessageListener<
1154 Message extends Data | Response
1155 >(
1156 workerNodeKey: number,
1157 listener: (message: MessageValue<Message>) => void
1158 ): void
1159
1160 /**
1161 * Method hooked up after a worker node has been newly created.
1162 * Can be overridden.
1163 *
1164 * @param workerNodeKey - The newly created worker node key.
1165 */
1166 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1167 // Listen to worker messages.
1168 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1169 // Send the startup message to worker.
1170 this.sendStartupMessageToWorker(workerNodeKey)
1171 // Send the statistics message to worker.
1172 this.sendStatisticsMessageToWorker(workerNodeKey)
1173 if (this.opts.enableTasksQueue === true) {
1174 this.workerNodes[workerNodeKey].onEmptyQueue =
1175 this.taskStealingOnEmptyQueue.bind(this)
1176 this.workerNodes[workerNodeKey].onBackPressure =
1177 this.tasksStealingOnBackPressure.bind(this)
1178 }
1179 }
1180
1181 /**
1182 * Sends the startup message to worker given its worker node key.
1183 *
1184 * @param workerNodeKey - The worker node key.
1185 */
1186 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1187
1188 /**
1189 * Sends the statistics message to worker given its worker node key.
1190 *
1191 * @param workerNodeKey - The worker node key.
1192 */
1193 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1194 this.sendToWorker(workerNodeKey, {
1195 statistics: {
1196 runTime:
1197 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1198 .runTime.aggregate,
1199 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1200 .elu.aggregate
1201 },
1202 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1203 })
1204 }
1205
1206 private redistributeQueuedTasks (workerNodeKey: number): void {
1207 while (this.tasksQueueSize(workerNodeKey) > 0) {
1208 let destinationWorkerNodeKey!: number
1209 let minQueuedTasks = Infinity
1210 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1211 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1212 if (workerNode.usage.tasks.queued === 0) {
1213 destinationWorkerNodeKey = workerNodeId
1214 break
1215 }
1216 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1217 minQueuedTasks = workerNode.usage.tasks.queued
1218 destinationWorkerNodeKey = workerNodeId
1219 }
1220 }
1221 }
1222 if (destinationWorkerNodeKey != null) {
1223 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1224 const task = {
1225 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1226 workerId: destinationWorkerNode.info.id as number
1227 }
1228 if (
1229 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1230 destinationWorkerNode.usage.tasks.executing <
1231 (this.opts.tasksQueueOptions?.concurrency as number)
1232 ) {
1233 this.executeTask(destinationWorkerNodeKey, task)
1234 } else {
1235 this.enqueueTask(destinationWorkerNodeKey, task)
1236 }
1237 }
1238 }
1239 }
1240
1241 private taskStealingOnEmptyQueue (workerId: number): void {
1242 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1243 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1244 const workerNodes = this.workerNodes
1245 .slice()
1246 .sort(
1247 (workerNodeA, workerNodeB) =>
1248 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1249 )
1250 for (const sourceWorkerNode of workerNodes) {
1251 if (sourceWorkerNode.usage.tasks.queued === 0) {
1252 break
1253 }
1254 if (
1255 sourceWorkerNode.info.ready &&
1256 sourceWorkerNode.info.id !== workerId &&
1257 sourceWorkerNode.usage.tasks.queued > 0
1258 ) {
1259 const task = {
1260 ...(sourceWorkerNode.popTask() as Task<Data>),
1261 workerId: destinationWorkerNode.info.id as number
1262 }
1263 if (
1264 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1265 destinationWorkerNode.usage.tasks.executing <
1266 (this.opts.tasksQueueOptions?.concurrency as number)
1267 ) {
1268 this.executeTask(destinationWorkerNodeKey, task)
1269 } else {
1270 this.enqueueTask(destinationWorkerNodeKey, task)
1271 }
1272 ++destinationWorkerNode.usage.tasks.stolen
1273 if (this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey)) {
1274 const taskFunctionWorkerUsage =
1275 destinationWorkerNode.getTaskFunctionWorkerUsage(
1276 task.name as string
1277 ) as WorkerUsage
1278 ++taskFunctionWorkerUsage.tasks.stolen
1279 }
1280 break
1281 }
1282 }
1283 }
1284
1285 private tasksStealingOnBackPressure (workerId: number): void {
1286 const sourceWorkerNode =
1287 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1288 const workerNodes = this.workerNodes
1289 .slice()
1290 .sort(
1291 (workerNodeA, workerNodeB) =>
1292 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1293 )
1294 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1295 if (
1296 sourceWorkerNode.usage.tasks.queued > 0 &&
1297 workerNode.info.ready &&
1298 workerNode.info.id !== workerId &&
1299 workerNode.usage.tasks.queued <
1300 (this.opts.tasksQueueOptions?.size as number) - 1
1301 ) {
1302 const task = {
1303 ...(sourceWorkerNode.popTask() as Task<Data>),
1304 workerId: workerNode.info.id as number
1305 }
1306 if (
1307 this.tasksQueueSize(workerNodeKey) === 0 &&
1308 workerNode.usage.tasks.executing <
1309 (this.opts.tasksQueueOptions?.concurrency as number)
1310 ) {
1311 this.executeTask(workerNodeKey, task)
1312 } else {
1313 this.enqueueTask(workerNodeKey, task)
1314 }
1315 ++workerNode.usage.tasks.stolen
1316 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
1317 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1318 task.name as string
1319 ) as WorkerUsage
1320 ++taskFunctionWorkerUsage.tasks.stolen
1321 }
1322 }
1323 }
1324 }
1325
1326 /**
1327 * This method is the listener registered for each worker message.
1328 *
1329 * @returns The listener function to execute when a message is received from a worker.
1330 */
1331 protected workerListener (): (message: MessageValue<Response>) => void {
1332 return (message) => {
1333 this.checkMessageWorkerId(message)
1334 if (message.ready != null && message.taskFunctions != null) {
1335 // Worker ready response received from worker
1336 this.handleWorkerReadyResponse(message)
1337 } else if (message.taskId != null) {
1338 // Task execution response received from worker
1339 this.handleTaskExecutionResponse(message)
1340 } else if (message.taskFunctions != null) {
1341 // Task functions message received from worker
1342 (
1343 this.getWorkerInfo(
1344 this.getWorkerNodeKeyByWorkerId(message.workerId)
1345 ) as WorkerInfo
1346 ).taskFunctions = message.taskFunctions
1347 }
1348 }
1349 }
1350
1351 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1352 if (message.ready === false) {
1353 throw new Error(`Worker ${message.workerId} failed to initialize`)
1354 }
1355 const workerInfo = this.getWorkerInfo(
1356 this.getWorkerNodeKeyByWorkerId(message.workerId)
1357 ) as WorkerInfo
1358 workerInfo.ready = message.ready as boolean
1359 workerInfo.taskFunctions = message.taskFunctions
1360 if (this.emitter != null && this.ready) {
1361 this.emitter.emit(PoolEvents.ready, this.info)
1362 }
1363 }
1364
1365 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1366 const { taskId, taskError, data } = message
1367 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1368 if (promiseResponse != null) {
1369 if (taskError != null) {
1370 this.emitter?.emit(PoolEvents.taskError, taskError)
1371 promiseResponse.reject(taskError.message)
1372 } else {
1373 promiseResponse.resolve(data as Response)
1374 }
1375 const workerNodeKey = promiseResponse.workerNodeKey
1376 this.afterTaskExecutionHook(workerNodeKey, message)
1377 this.promiseResponseMap.delete(taskId as string)
1378 if (
1379 this.opts.enableTasksQueue === true &&
1380 this.tasksQueueSize(workerNodeKey) > 0 &&
1381 this.workerNodes[workerNodeKey].usage.tasks.executing <
1382 (this.opts.tasksQueueOptions?.concurrency as number)
1383 ) {
1384 this.executeTask(
1385 workerNodeKey,
1386 this.dequeueTask(workerNodeKey) as Task<Data>
1387 )
1388 }
1389 this.workerChoiceStrategyContext.update(workerNodeKey)
1390 }
1391 }
1392
1393 private checkAndEmitTaskExecutionEvents (): void {
1394 if (this.busy) {
1395 this.emitter?.emit(PoolEvents.busy, this.info)
1396 }
1397 }
1398
1399 private checkAndEmitTaskQueuingEvents (): void {
1400 if (this.hasBackPressure()) {
1401 this.emitter?.emit(PoolEvents.backPressure, this.info)
1402 }
1403 }
1404
1405 private checkAndEmitDynamicWorkerCreationEvents (): void {
1406 if (this.type === PoolTypes.dynamic) {
1407 if (this.full) {
1408 this.emitter?.emit(PoolEvents.full, this.info)
1409 }
1410 }
1411 }
1412
1413 /**
1414 * Gets the worker information given its worker node key.
1415 *
1416 * @param workerNodeKey - The worker node key.
1417 * @returns The worker information.
1418 */
1419 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1420 return this.workerNodes[workerNodeKey]?.info
1421 }
1422
1423 /**
1424 * Adds the given worker in the pool worker nodes.
1425 *
1426 * @param worker - The worker.
1427 * @returns The added worker node key.
1428 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1429 */
1430 private addWorkerNode (worker: Worker): number {
1431 const workerNode = new WorkerNode<Worker, Data>(
1432 worker,
1433 this.worker,
1434 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1435 )
1436 // Flag the worker node as ready at pool startup.
1437 if (this.starting) {
1438 workerNode.info.ready = true
1439 }
1440 this.workerNodes.push(workerNode)
1441 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1442 if (workerNodeKey === -1) {
1443 throw new Error('Worker node added not found')
1444 }
1445 return workerNodeKey
1446 }
1447
1448 /**
1449 * Removes the given worker from the pool worker nodes.
1450 *
1451 * @param worker - The worker.
1452 */
1453 private removeWorkerNode (worker: Worker): void {
1454 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1455 if (workerNodeKey !== -1) {
1456 this.workerNodes.splice(workerNodeKey, 1)
1457 this.workerChoiceStrategyContext.remove(workerNodeKey)
1458 }
1459 }
1460
1461 /** @inheritDoc */
1462 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1463 return (
1464 this.opts.enableTasksQueue === true &&
1465 this.workerNodes[workerNodeKey].hasBackPressure()
1466 )
1467 }
1468
1469 private hasBackPressure (): boolean {
1470 return (
1471 this.opts.enableTasksQueue === true &&
1472 this.workerNodes.findIndex(
1473 (workerNode) => !workerNode.hasBackPressure()
1474 ) === -1
1475 )
1476 }
1477
1478 /**
1479 * Executes the given task on the worker given its worker node key.
1480 *
1481 * @param workerNodeKey - The worker node key.
1482 * @param task - The task to execute.
1483 */
1484 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1485 this.beforeTaskExecutionHook(workerNodeKey, task)
1486 this.sendToWorker(workerNodeKey, task, task.transferList)
1487 this.checkAndEmitTaskExecutionEvents()
1488 }
1489
1490 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1491 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1492 this.checkAndEmitTaskQueuingEvents()
1493 return tasksQueueSize
1494 }
1495
1496 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1497 return this.workerNodes[workerNodeKey].dequeueTask()
1498 }
1499
1500 private tasksQueueSize (workerNodeKey: number): number {
1501 return this.workerNodes[workerNodeKey].tasksQueueSize()
1502 }
1503
1504 protected flushTasksQueue (workerNodeKey: number): void {
1505 while (this.tasksQueueSize(workerNodeKey) > 0) {
1506 this.executeTask(
1507 workerNodeKey,
1508 this.dequeueTask(workerNodeKey) as Task<Data>
1509 )
1510 }
1511 this.workerNodes[workerNodeKey].clearTasksQueue()
1512 }
1513
1514 private flushTasksQueues (): void {
1515 for (const [workerNodeKey] of this.workerNodes.entries()) {
1516 this.flushTasksQueue(workerNodeKey)
1517 }
1518 }
1519 }