Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/fastify-worker_thr...
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 average,
15 isKillBehavior,
16 isPlainObject,
17 max,
18 median,
19 min,
20 round,
21 updateMeasurementStatistics
22 } from '../utils'
23 import { KillBehaviors } from '../worker/worker-options'
24 import {
25 type IPool,
26 PoolEmitter,
27 PoolEvents,
28 type PoolInfo,
29 type PoolOptions,
30 type PoolType,
31 PoolTypes,
32 type TasksQueueOptions
33 } from './pool'
34 import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
40 } from './worker'
41 import {
42 type MeasurementStatisticsRequirements,
43 Measurements,
44 WorkerChoiceStrategies,
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
47 } from './selection-strategies/selection-strategies-types'
48 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
49 import { version } from './version'
50 import { WorkerNode } from './worker-node'
51
52 /**
53 * Base class that implements some shared logic for all poolifier pools.
54 *
55 * @typeParam Worker - Type of worker which manages this pool.
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
58 */
59 export abstract class AbstractPool<
60 Worker extends IWorker,
61 Data = unknown,
62 Response = unknown
63 > implements IPool<Worker, Data, Response> {
64 /** @inheritDoc */
65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
66
67 /** @inheritDoc */
68 public readonly emitter?: PoolEmitter
69
70 /**
71 * The task execution response promise map:
72 * - `key`: The message id of each submitted task.
73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 *
75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 */
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
94 /**
95 * Whether the pool is started or not.
96 */
97 private started: boolean
98 /**
99 * Whether the pool is starting or not.
100 */
101 private starting: boolean
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
107 /**
108 * Constructs a new poolifier pool.
109 *
110 * @param numberOfWorkers - Number of workers that this pool should manage.
111 * @param filePath - Path to the worker file.
112 * @param opts - Options for the pool.
113 */
114 public constructor (
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
118 ) {
119 if (!this.isMain()) {
120 throw new Error(
121 'Cannot start a pool from a worker with the same type as the pool'
122 )
123 }
124 this.checkNumberOfWorkers(this.numberOfWorkers)
125 this.checkFilePath(this.filePath)
126 this.checkPoolOptions(this.opts)
127
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
131
132 if (this.opts.enableEvents === true) {
133 this.emitter = new PoolEmitter()
134 }
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
144
145 this.setupHook()
146
147 this.started = false
148 this.starting = false
149 if (this.opts.startWorkers === true) {
150 this.start()
151 }
152
153 this.startTimestamp = performance.now()
154 }
155
156 private checkFilePath (filePath: string): void {
157 if (
158 filePath == null ||
159 typeof filePath !== 'string' ||
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
162 throw new Error('Please specify a file with a worker implementation')
163 }
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
167 }
168
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
175 throw new TypeError(
176 'Cannot instantiate a pool with a non safe integer number of workers'
177 )
178 } else if (numberOfWorkers < 0) {
179 throw new RangeError(
180 'Cannot instantiate a pool with a negative number of workers'
181 )
182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
188 if (this.type === PoolTypes.dynamic) {
189 if (max == null) {
190 throw new TypeError(
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
201 } else if (max === 0) {
202 throw new RangeError(
203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
210 }
211 }
212
213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
214 if (isPlainObject(opts)) {
215 this.opts.startWorkers = opts.startWorkers ?? true
216 this.checkValidWorkerChoiceStrategy(
217 opts.workerChoiceStrategy as WorkerChoiceStrategy
218 )
219 this.opts.workerChoiceStrategy =
220 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
221 this.checkValidWorkerChoiceStrategyOptions(
222 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
223 )
224 this.opts.workerChoiceStrategyOptions = {
225 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
226 ...opts.workerChoiceStrategyOptions
227 }
228 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
229 this.opts.enableEvents = opts.enableEvents ?? true
230 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
231 if (this.opts.enableTasksQueue) {
232 this.checkValidTasksQueueOptions(
233 opts.tasksQueueOptions as TasksQueueOptions
234 )
235 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
236 opts.tasksQueueOptions as TasksQueueOptions
237 )
238 }
239 } else {
240 throw new TypeError('Invalid pool options: must be a plain object')
241 }
242 }
243
244 private checkValidWorkerChoiceStrategy (
245 workerChoiceStrategy: WorkerChoiceStrategy
246 ): void {
247 if (
248 workerChoiceStrategy != null &&
249 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy '${workerChoiceStrategy}'`
253 )
254 }
255 }
256
257 private checkValidWorkerChoiceStrategyOptions (
258 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
259 ): void {
260 if (
261 workerChoiceStrategyOptions != null &&
262 !isPlainObject(workerChoiceStrategyOptions)
263 ) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: must be a plain object'
266 )
267 }
268 if (
269 workerChoiceStrategyOptions?.retries != null &&
270 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
271 ) {
272 throw new TypeError(
273 'Invalid worker choice strategy options: retries must be an integer'
274 )
275 }
276 if (
277 workerChoiceStrategyOptions?.retries != null &&
278 workerChoiceStrategyOptions.retries < 0
279 ) {
280 throw new RangeError(
281 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
282 )
283 }
284 if (
285 workerChoiceStrategyOptions?.weights != null &&
286 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
287 ) {
288 throw new Error(
289 'Invalid worker choice strategy options: must have a weight for each worker node'
290 )
291 }
292 if (
293 workerChoiceStrategyOptions?.measurement != null &&
294 !Object.values(Measurements).includes(
295 workerChoiceStrategyOptions.measurement
296 )
297 ) {
298 throw new Error(
299 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
300 )
301 }
302 }
303
304 private checkValidTasksQueueOptions (
305 tasksQueueOptions: TasksQueueOptions
306 ): void {
307 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
308 throw new TypeError('Invalid tasks queue options: must be a plain object')
309 }
310 if (
311 tasksQueueOptions?.concurrency != null &&
312 !Number.isSafeInteger(tasksQueueOptions.concurrency)
313 ) {
314 throw new TypeError(
315 'Invalid worker node tasks concurrency: must be an integer'
316 )
317 }
318 if (
319 tasksQueueOptions?.concurrency != null &&
320 tasksQueueOptions.concurrency <= 0
321 ) {
322 throw new RangeError(
323 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
324 )
325 }
326 if (
327 tasksQueueOptions?.size != null &&
328 !Number.isSafeInteger(tasksQueueOptions.size)
329 ) {
330 throw new TypeError(
331 'Invalid worker node tasks queue size: must be an integer'
332 )
333 }
334 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
335 throw new RangeError(
336 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
337 )
338 }
339 }
340
341 /** @inheritDoc */
342 public get info (): PoolInfo {
343 return {
344 version,
345 type: this.type,
346 worker: this.worker,
347 started: this.started,
348 ready: this.ready,
349 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
350 minSize: this.minSize,
351 maxSize: this.maxSize,
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.aggregate &&
354 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .waitTime.aggregate && { utilization: round(this.utilization) }),
356 workerNodes: this.workerNodes.length,
357 idleWorkerNodes: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
359 workerNode.usage.tasks.executing === 0
360 ? accumulator + 1
361 : accumulator,
362 0
363 ),
364 busyWorkerNodes: this.workerNodes.reduce(
365 (accumulator, workerNode) =>
366 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
367 0
368 ),
369 executedTasks: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 accumulator + workerNode.usage.tasks.executed,
372 0
373 ),
374 executingTasks: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 accumulator + workerNode.usage.tasks.executing,
377 0
378 ),
379 ...(this.opts.enableTasksQueue === true && {
380 queuedTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + workerNode.usage.tasks.queued,
383 0
384 )
385 }),
386 ...(this.opts.enableTasksQueue === true && {
387 maxQueuedTasks: this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
390 0
391 )
392 }),
393 ...(this.opts.enableTasksQueue === true && {
394 backPressure: this.hasBackPressure()
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 stolenTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + workerNode.usage.tasks.stolen,
400 0
401 )
402 }),
403 failedTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + workerNode.usage.tasks.failed,
406 0
407 ),
408 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
409 .runTime.aggregate && {
410 runTime: {
411 minimum: round(
412 min(
413 ...this.workerNodes.map(
414 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
415 )
416 )
417 ),
418 maximum: round(
419 max(
420 ...this.workerNodes.map(
421 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
422 )
423 )
424 ),
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .runTime.average && {
427 average: round(
428 average(
429 this.workerNodes.reduce<number[]>(
430 (accumulator, workerNode) =>
431 accumulator.concat(workerNode.usage.runTime.history),
432 []
433 )
434 )
435 )
436 }),
437 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
438 .runTime.median && {
439 median: round(
440 median(
441 this.workerNodes.reduce<number[]>(
442 (accumulator, workerNode) =>
443 accumulator.concat(workerNode.usage.runTime.history),
444 []
445 )
446 )
447 )
448 })
449 }
450 }),
451 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
452 .waitTime.aggregate && {
453 waitTime: {
454 minimum: round(
455 min(
456 ...this.workerNodes.map(
457 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
458 )
459 )
460 ),
461 maximum: round(
462 max(
463 ...this.workerNodes.map(
464 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
465 )
466 )
467 ),
468 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
469 .waitTime.average && {
470 average: round(
471 average(
472 this.workerNodes.reduce<number[]>(
473 (accumulator, workerNode) =>
474 accumulator.concat(workerNode.usage.waitTime.history),
475 []
476 )
477 )
478 )
479 }),
480 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 .waitTime.median && {
482 median: round(
483 median(
484 this.workerNodes.reduce<number[]>(
485 (accumulator, workerNode) =>
486 accumulator.concat(workerNode.usage.waitTime.history),
487 []
488 )
489 )
490 )
491 })
492 }
493 })
494 }
495 }
496
497 /**
498 * The pool readiness boolean status.
499 */
500 private get ready (): boolean {
501 return (
502 this.workerNodes.reduce(
503 (accumulator, workerNode) =>
504 !workerNode.info.dynamic && workerNode.info.ready
505 ? accumulator + 1
506 : accumulator,
507 0
508 ) >= this.minSize
509 )
510 }
511
512 /**
513 * The approximate pool utilization.
514 *
515 * @returns The pool utilization.
516 */
517 private get utilization (): number {
518 const poolTimeCapacity =
519 (performance.now() - this.startTimestamp) * this.maxSize
520 const totalTasksRunTime = this.workerNodes.reduce(
521 (accumulator, workerNode) =>
522 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
523 0
524 )
525 const totalTasksWaitTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
527 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
528 0
529 )
530 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
531 }
532
533 /**
534 * The pool type.
535 *
536 * If it is `'dynamic'`, it provides the `max` property.
537 */
538 protected abstract get type (): PoolType
539
540 /**
541 * The worker type.
542 */
543 protected abstract get worker (): WorkerType
544
545 /**
546 * The pool minimum size.
547 */
548 protected get minSize (): number {
549 return this.numberOfWorkers
550 }
551
552 /**
553 * The pool maximum size.
554 */
555 protected get maxSize (): number {
556 return this.max ?? this.numberOfWorkers
557 }
558
559 /**
560 * Checks if the worker id sent in the received message from a worker is valid.
561 *
562 * @param message - The received message.
563 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
564 */
565 private checkMessageWorkerId (message: MessageValue<Response>): void {
566 if (message.workerId == null) {
567 throw new Error('Worker message received without worker id')
568 } else if (
569 message.workerId != null &&
570 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
571 ) {
572 throw new Error(
573 `Worker message received from unknown worker '${message.workerId}'`
574 )
575 }
576 }
577
578 /**
579 * Gets the given worker its worker node key.
580 *
581 * @param worker - The worker.
582 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
583 */
584 private getWorkerNodeKeyByWorker (worker: Worker): number {
585 return this.workerNodes.findIndex(
586 workerNode => workerNode.worker === worker
587 )
588 }
589
590 /**
591 * Gets the worker node key given its worker id.
592 *
593 * @param workerId - The worker id.
594 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
595 */
596 private getWorkerNodeKeyByWorkerId (workerId: number): number {
597 return this.workerNodes.findIndex(
598 workerNode => workerNode.info.id === workerId
599 )
600 }
601
602 /** @inheritDoc */
603 public setWorkerChoiceStrategy (
604 workerChoiceStrategy: WorkerChoiceStrategy,
605 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
606 ): void {
607 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
608 this.opts.workerChoiceStrategy = workerChoiceStrategy
609 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
610 this.opts.workerChoiceStrategy
611 )
612 if (workerChoiceStrategyOptions != null) {
613 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
614 }
615 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
616 workerNode.resetUsage()
617 this.sendStatisticsMessageToWorker(workerNodeKey)
618 }
619 }
620
621 /** @inheritDoc */
622 public setWorkerChoiceStrategyOptions (
623 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
624 ): void {
625 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
626 this.opts.workerChoiceStrategyOptions = {
627 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
628 ...workerChoiceStrategyOptions
629 }
630 this.workerChoiceStrategyContext.setOptions(
631 this.opts.workerChoiceStrategyOptions
632 )
633 }
634
635 /** @inheritDoc */
636 public enableTasksQueue (
637 enable: boolean,
638 tasksQueueOptions?: TasksQueueOptions
639 ): void {
640 if (this.opts.enableTasksQueue === true && !enable) {
641 this.unsetTaskStealing()
642 this.unsetTasksStealingOnBackPressure()
643 this.flushTasksQueues()
644 }
645 this.opts.enableTasksQueue = enable
646 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
647 }
648
649 /** @inheritDoc */
650 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
651 if (this.opts.enableTasksQueue === true) {
652 this.checkValidTasksQueueOptions(tasksQueueOptions)
653 this.opts.tasksQueueOptions =
654 this.buildTasksQueueOptions(tasksQueueOptions)
655 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
656 if (this.opts.tasksQueueOptions.taskStealing === true) {
657 this.setTaskStealing()
658 } else {
659 this.unsetTaskStealing()
660 }
661 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
662 this.setTasksStealingOnBackPressure()
663 } else {
664 this.unsetTasksStealingOnBackPressure()
665 }
666 } else if (this.opts.tasksQueueOptions != null) {
667 delete this.opts.tasksQueueOptions
668 }
669 }
670
671 private buildTasksQueueOptions (
672 tasksQueueOptions: TasksQueueOptions
673 ): TasksQueueOptions {
674 return {
675 ...{
676 size: Math.pow(this.maxSize, 2),
677 concurrency: 1,
678 taskStealing: true,
679 tasksStealingOnBackPressure: true
680 },
681 ...tasksQueueOptions
682 }
683 }
684
685 private setTasksQueueSize (size: number): void {
686 for (const workerNode of this.workerNodes) {
687 workerNode.tasksQueueBackPressureSize = size
688 }
689 }
690
691 private setTaskStealing (): void {
692 for (const [workerNodeKey] of this.workerNodes.entries()) {
693 this.workerNodes[workerNodeKey].onEmptyQueue =
694 this.taskStealingOnEmptyQueue.bind(this)
695 }
696 }
697
698 private unsetTaskStealing (): void {
699 for (const [workerNodeKey] of this.workerNodes.entries()) {
700 delete this.workerNodes[workerNodeKey].onEmptyQueue
701 }
702 }
703
704 private setTasksStealingOnBackPressure (): void {
705 for (const [workerNodeKey] of this.workerNodes.entries()) {
706 this.workerNodes[workerNodeKey].onBackPressure =
707 this.tasksStealingOnBackPressure.bind(this)
708 }
709 }
710
711 private unsetTasksStealingOnBackPressure (): void {
712 for (const [workerNodeKey] of this.workerNodes.entries()) {
713 delete this.workerNodes[workerNodeKey].onBackPressure
714 }
715 }
716
717 /**
718 * Whether the pool is full or not.
719 *
720 * The pool filling boolean status.
721 */
722 protected get full (): boolean {
723 return this.workerNodes.length >= this.maxSize
724 }
725
726 /**
727 * Whether the pool is busy or not.
728 *
729 * The pool busyness boolean status.
730 */
731 protected abstract get busy (): boolean
732
733 /**
734 * Whether worker nodes are executing concurrently their tasks quota or not.
735 *
736 * @returns Worker nodes busyness boolean status.
737 */
738 protected internalBusy (): boolean {
739 if (this.opts.enableTasksQueue === true) {
740 return (
741 this.workerNodes.findIndex(
742 workerNode =>
743 workerNode.info.ready &&
744 workerNode.usage.tasks.executing <
745 (this.opts.tasksQueueOptions?.concurrency as number)
746 ) === -1
747 )
748 }
749 return (
750 this.workerNodes.findIndex(
751 workerNode =>
752 workerNode.info.ready && workerNode.usage.tasks.executing === 0
753 ) === -1
754 )
755 }
756
757 /** @inheritDoc */
758 public listTaskFunctions (): string[] {
759 for (const workerNode of this.workerNodes) {
760 if (
761 Array.isArray(workerNode.info.taskFunctions) &&
762 workerNode.info.taskFunctions.length > 0
763 ) {
764 return workerNode.info.taskFunctions
765 }
766 }
767 return []
768 }
769
770 private shallExecuteTask (workerNodeKey: number): boolean {
771 return (
772 this.tasksQueueSize(workerNodeKey) === 0 &&
773 this.workerNodes[workerNodeKey].usage.tasks.executing <
774 (this.opts.tasksQueueOptions?.concurrency as number)
775 )
776 }
777
778 /** @inheritDoc */
779 public async execute (
780 data?: Data,
781 name?: string,
782 transferList?: TransferListItem[]
783 ): Promise<Response> {
784 return await new Promise<Response>((resolve, reject) => {
785 if (!this.started) {
786 reject(new Error('Cannot execute a task on not started pool'))
787 return
788 }
789 if (name != null && typeof name !== 'string') {
790 reject(new TypeError('name argument must be a string'))
791 return
792 }
793 if (
794 name != null &&
795 typeof name === 'string' &&
796 name.trim().length === 0
797 ) {
798 reject(new TypeError('name argument must not be an empty string'))
799 return
800 }
801 if (transferList != null && !Array.isArray(transferList)) {
802 reject(new TypeError('transferList argument must be an array'))
803 return
804 }
805 const timestamp = performance.now()
806 const workerNodeKey = this.chooseWorkerNode()
807 const task: Task<Data> = {
808 name: name ?? DEFAULT_TASK_NAME,
809 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
810 data: data ?? ({} as Data),
811 transferList,
812 timestamp,
813 workerId: this.getWorkerInfo(workerNodeKey).id as number,
814 taskId: randomUUID()
815 }
816 this.promiseResponseMap.set(task.taskId as string, {
817 resolve,
818 reject,
819 workerNodeKey
820 })
821 if (
822 this.opts.enableTasksQueue === false ||
823 (this.opts.enableTasksQueue === true &&
824 this.shallExecuteTask(workerNodeKey))
825 ) {
826 this.executeTask(workerNodeKey, task)
827 } else {
828 this.enqueueTask(workerNodeKey, task)
829 }
830 })
831 }
832
833 /** @inheritdoc */
834 public start (): void {
835 this.starting = true
836 while (
837 this.workerNodes.reduce(
838 (accumulator, workerNode) =>
839 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
840 0
841 ) < this.numberOfWorkers
842 ) {
843 this.createAndSetupWorkerNode()
844 }
845 this.starting = false
846 this.started = true
847 }
848
849 /** @inheritDoc */
850 public async destroy (): Promise<void> {
851 await Promise.all(
852 this.workerNodes.map(async (_, workerNodeKey) => {
853 await this.destroyWorkerNode(workerNodeKey)
854 })
855 )
856 this.emitter?.emit(PoolEvents.destroy, this.info)
857 this.started = false
858 }
859
860 protected async sendKillMessageToWorker (
861 workerNodeKey: number,
862 workerId: number
863 ): Promise<void> {
864 await new Promise<void>((resolve, reject) => {
865 this.registerWorkerMessageListener(workerNodeKey, message => {
866 if (message.kill === 'success') {
867 resolve()
868 } else if (message.kill === 'failure') {
869 reject(new Error(`Worker ${workerId} kill message handling failed`))
870 }
871 })
872 this.sendToWorker(workerNodeKey, { kill: true, workerId })
873 })
874 }
875
876 /**
877 * Terminates the worker node given its worker node key.
878 *
879 * @param workerNodeKey - The worker node key.
880 */
881 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
882
883 /**
884 * Setup hook to execute code before worker nodes are created in the abstract constructor.
885 * Can be overridden.
886 *
887 * @virtual
888 */
889 protected setupHook (): void {
890 /* Intentionally empty */
891 }
892
893 /**
894 * Should return whether the worker is the main worker or not.
895 */
896 protected abstract isMain (): boolean
897
898 /**
899 * Hook executed before the worker task execution.
900 * Can be overridden.
901 *
902 * @param workerNodeKey - The worker node key.
903 * @param task - The task to execute.
904 */
905 protected beforeTaskExecutionHook (
906 workerNodeKey: number,
907 task: Task<Data>
908 ): void {
909 if (this.workerNodes[workerNodeKey]?.usage != null) {
910 const workerUsage = this.workerNodes[workerNodeKey].usage
911 ++workerUsage.tasks.executing
912 this.updateWaitTimeWorkerUsage(workerUsage, task)
913 }
914 if (
915 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
916 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
917 task.name as string
918 ) != null
919 ) {
920 const taskFunctionWorkerUsage = this.workerNodes[
921 workerNodeKey
922 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
923 ++taskFunctionWorkerUsage.tasks.executing
924 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
925 }
926 }
927
928 /**
929 * Hook executed after the worker task execution.
930 * Can be overridden.
931 *
932 * @param workerNodeKey - The worker node key.
933 * @param message - The received message.
934 */
935 protected afterTaskExecutionHook (
936 workerNodeKey: number,
937 message: MessageValue<Response>
938 ): void {
939 if (this.workerNodes[workerNodeKey]?.usage != null) {
940 const workerUsage = this.workerNodes[workerNodeKey].usage
941 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
942 this.updateRunTimeWorkerUsage(workerUsage, message)
943 this.updateEluWorkerUsage(workerUsage, message)
944 }
945 if (
946 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
947 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
948 message.taskPerformance?.name as string
949 ) != null
950 ) {
951 const taskFunctionWorkerUsage = this.workerNodes[
952 workerNodeKey
953 ].getTaskFunctionWorkerUsage(
954 message.taskPerformance?.name as string
955 ) as WorkerUsage
956 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
957 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
958 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
959 }
960 }
961
962 /**
963 * Whether the worker node shall update its task function worker usage or not.
964 *
965 * @param workerNodeKey - The worker node key.
966 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
967 */
968 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
969 const workerInfo = this.getWorkerInfo(workerNodeKey)
970 return (
971 workerInfo != null &&
972 Array.isArray(workerInfo.taskFunctions) &&
973 workerInfo.taskFunctions.length > 2
974 )
975 }
976
977 private updateTaskStatisticsWorkerUsage (
978 workerUsage: WorkerUsage,
979 message: MessageValue<Response>
980 ): void {
981 const workerTaskStatistics = workerUsage.tasks
982 if (
983 workerTaskStatistics.executing != null &&
984 workerTaskStatistics.executing > 0
985 ) {
986 --workerTaskStatistics.executing
987 }
988 if (message.taskError == null) {
989 ++workerTaskStatistics.executed
990 } else {
991 ++workerTaskStatistics.failed
992 }
993 }
994
995 private updateRunTimeWorkerUsage (
996 workerUsage: WorkerUsage,
997 message: MessageValue<Response>
998 ): void {
999 if (message.taskError != null) {
1000 return
1001 }
1002 updateMeasurementStatistics(
1003 workerUsage.runTime,
1004 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
1005 message.taskPerformance?.runTime ?? 0
1006 )
1007 }
1008
1009 private updateWaitTimeWorkerUsage (
1010 workerUsage: WorkerUsage,
1011 task: Task<Data>
1012 ): void {
1013 const timestamp = performance.now()
1014 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
1015 updateMeasurementStatistics(
1016 workerUsage.waitTime,
1017 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
1018 taskWaitTime
1019 )
1020 }
1021
1022 private updateEluWorkerUsage (
1023 workerUsage: WorkerUsage,
1024 message: MessageValue<Response>
1025 ): void {
1026 if (message.taskError != null) {
1027 return
1028 }
1029 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1030 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
1031 updateMeasurementStatistics(
1032 workerUsage.elu.active,
1033 eluTaskStatisticsRequirements,
1034 message.taskPerformance?.elu?.active ?? 0
1035 )
1036 updateMeasurementStatistics(
1037 workerUsage.elu.idle,
1038 eluTaskStatisticsRequirements,
1039 message.taskPerformance?.elu?.idle ?? 0
1040 )
1041 if (eluTaskStatisticsRequirements.aggregate) {
1042 if (message.taskPerformance?.elu != null) {
1043 if (workerUsage.elu.utilization != null) {
1044 workerUsage.elu.utilization =
1045 (workerUsage.elu.utilization +
1046 message.taskPerformance.elu.utilization) /
1047 2
1048 } else {
1049 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1050 }
1051 }
1052 }
1053 }
1054
1055 /**
1056 * Chooses a worker node for the next task.
1057 *
1058 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
1059 *
1060 * @returns The chosen worker node key
1061 */
1062 private chooseWorkerNode (): number {
1063 if (this.shallCreateDynamicWorker()) {
1064 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
1065 if (
1066 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1067 ) {
1068 return workerNodeKey
1069 }
1070 }
1071 return this.workerChoiceStrategyContext.execute()
1072 }
1073
1074 /**
1075 * Conditions for dynamic worker creation.
1076 *
1077 * @returns Whether to create a dynamic worker or not.
1078 */
1079 private shallCreateDynamicWorker (): boolean {
1080 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
1081 }
1082
1083 /**
1084 * Sends a message to worker given its worker node key.
1085 *
1086 * @param workerNodeKey - The worker node key.
1087 * @param message - The message.
1088 * @param transferList - The optional array of transferable objects.
1089 */
1090 protected abstract sendToWorker (
1091 workerNodeKey: number,
1092 message: MessageValue<Data>,
1093 transferList?: TransferListItem[]
1094 ): void
1095
1096 /**
1097 * Creates a new worker.
1098 *
1099 * @returns Newly created worker.
1100 */
1101 protected abstract createWorker (): Worker
1102
1103 /**
1104 * Creates a new, completely set up worker node.
1105 *
1106 * @returns New, completely set up worker node key.
1107 */
1108 protected createAndSetupWorkerNode (): number {
1109 const worker = this.createWorker()
1110
1111 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1112 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1113 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1114 worker.on('error', error => {
1115 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1116 const workerInfo = this.getWorkerInfo(workerNodeKey)
1117 workerInfo.ready = false
1118 this.workerNodes[workerNodeKey].closeChannel()
1119 this.emitter?.emit(PoolEvents.error, error)
1120 if (
1121 this.started &&
1122 !this.starting &&
1123 this.opts.restartWorkerOnError === true
1124 ) {
1125 if (workerInfo.dynamic) {
1126 this.createAndSetupDynamicWorkerNode()
1127 } else {
1128 this.createAndSetupWorkerNode()
1129 }
1130 }
1131 if (this.started && this.opts.enableTasksQueue === true) {
1132 this.redistributeQueuedTasks(workerNodeKey)
1133 }
1134 })
1135 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1136 worker.once('exit', () => {
1137 this.removeWorkerNode(worker)
1138 })
1139
1140 const workerNodeKey = this.addWorkerNode(worker)
1141
1142 this.afterWorkerNodeSetup(workerNodeKey)
1143
1144 return workerNodeKey
1145 }
1146
1147 /**
1148 * Creates a new, completely set up dynamic worker node.
1149 *
1150 * @returns New, completely set up dynamic worker node key.
1151 */
1152 protected createAndSetupDynamicWorkerNode (): number {
1153 const workerNodeKey = this.createAndSetupWorkerNode()
1154 this.registerWorkerMessageListener(workerNodeKey, message => {
1155 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1156 message.workerId
1157 )
1158 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1159 // Kill message received from worker
1160 if (
1161 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1162 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1163 ((this.opts.enableTasksQueue === false &&
1164 workerUsage.tasks.executing === 0) ||
1165 (this.opts.enableTasksQueue === true &&
1166 workerUsage.tasks.executing === 0 &&
1167 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1168 ) {
1169 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
1170 this.emitter?.emit(PoolEvents.error, error)
1171 })
1172 }
1173 })
1174 const workerInfo = this.getWorkerInfo(workerNodeKey)
1175 this.sendToWorker(workerNodeKey, {
1176 checkActive: true,
1177 workerId: workerInfo.id as number
1178 })
1179 workerInfo.dynamic = true
1180 if (
1181 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1182 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1183 ) {
1184 workerInfo.ready = true
1185 }
1186 this.checkAndEmitDynamicWorkerCreationEvents()
1187 return workerNodeKey
1188 }
1189
1190 /**
1191 * Registers a listener callback on the worker given its worker node key.
1192 *
1193 * @param workerNodeKey - The worker node key.
1194 * @param listener - The message listener callback.
1195 */
1196 protected abstract registerWorkerMessageListener<
1197 Message extends Data | Response
1198 >(
1199 workerNodeKey: number,
1200 listener: (message: MessageValue<Message>) => void
1201 ): void
1202
1203 /**
1204 * Method hooked up after a worker node has been newly created.
1205 * Can be overridden.
1206 *
1207 * @param workerNodeKey - The newly created worker node key.
1208 */
1209 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1210 // Listen to worker messages.
1211 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1212 // Send the startup message to worker.
1213 this.sendStartupMessageToWorker(workerNodeKey)
1214 // Send the statistics message to worker.
1215 this.sendStatisticsMessageToWorker(workerNodeKey)
1216 if (this.opts.enableTasksQueue === true) {
1217 if (this.opts.tasksQueueOptions?.taskStealing === true) {
1218 this.workerNodes[workerNodeKey].onEmptyQueue =
1219 this.taskStealingOnEmptyQueue.bind(this)
1220 }
1221 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1222 this.workerNodes[workerNodeKey].onBackPressure =
1223 this.tasksStealingOnBackPressure.bind(this)
1224 }
1225 }
1226 }
1227
1228 /**
1229 * Sends the startup message to worker given its worker node key.
1230 *
1231 * @param workerNodeKey - The worker node key.
1232 */
1233 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1234
1235 /**
1236 * Sends the statistics message to worker given its worker node key.
1237 *
1238 * @param workerNodeKey - The worker node key.
1239 */
1240 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1241 this.sendToWorker(workerNodeKey, {
1242 statistics: {
1243 runTime:
1244 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1245 .runTime.aggregate,
1246 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1247 .elu.aggregate
1248 },
1249 workerId: this.getWorkerInfo(workerNodeKey).id as number
1250 })
1251 }
1252
1253 private redistributeQueuedTasks (workerNodeKey: number): void {
1254 while (this.tasksQueueSize(workerNodeKey) > 0) {
1255 const destinationWorkerNodeKey = this.workerNodes.reduce(
1256 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1257 return workerNode.info.ready &&
1258 workerNode.usage.tasks.queued <
1259 workerNodes[minWorkerNodeKey].usage.tasks.queued
1260 ? workerNodeKey
1261 : minWorkerNodeKey
1262 },
1263 0
1264 )
1265 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1266 const task = {
1267 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1268 workerId: destinationWorkerNode.info.id as number
1269 }
1270 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1271 this.executeTask(destinationWorkerNodeKey, task)
1272 } else {
1273 this.enqueueTask(destinationWorkerNodeKey, task)
1274 }
1275 }
1276 }
1277
1278 private updateTaskStolenStatisticsWorkerUsage (
1279 workerNodeKey: number,
1280 taskName: string
1281 ): void {
1282 const workerNode = this.workerNodes[workerNodeKey]
1283 if (workerNode?.usage != null) {
1284 ++workerNode.usage.tasks.stolen
1285 }
1286 if (
1287 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1288 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1289 ) {
1290 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1291 taskName
1292 ) as WorkerUsage
1293 ++taskFunctionWorkerUsage.tasks.stolen
1294 }
1295 }
1296
1297 private taskStealingOnEmptyQueue (workerId: number): void {
1298 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1299 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1300 const workerNodes = this.workerNodes
1301 .slice()
1302 .sort(
1303 (workerNodeA, workerNodeB) =>
1304 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1305 )
1306 const sourceWorkerNode = workerNodes.find(
1307 workerNode =>
1308 workerNode.info.ready &&
1309 workerNode.info.id !== workerId &&
1310 workerNode.usage.tasks.queued > 0
1311 )
1312 if (sourceWorkerNode != null) {
1313 const task = {
1314 ...(sourceWorkerNode.popTask() as Task<Data>),
1315 workerId: destinationWorkerNode.info.id as number
1316 }
1317 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1318 this.executeTask(destinationWorkerNodeKey, task)
1319 } else {
1320 this.enqueueTask(destinationWorkerNodeKey, task)
1321 }
1322 this.updateTaskStolenStatisticsWorkerUsage(
1323 destinationWorkerNodeKey,
1324 task.name as string
1325 )
1326 }
1327 }
1328
1329 private tasksStealingOnBackPressure (workerId: number): void {
1330 const sizeOffset = 1
1331 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
1332 return
1333 }
1334 const sourceWorkerNode =
1335 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1336 const workerNodes = this.workerNodes
1337 .slice()
1338 .sort(
1339 (workerNodeA, workerNodeB) =>
1340 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1341 )
1342 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1343 if (
1344 sourceWorkerNode.usage.tasks.queued > 0 &&
1345 workerNode.info.ready &&
1346 workerNode.info.id !== workerId &&
1347 workerNode.usage.tasks.queued <
1348 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
1349 ) {
1350 const task = {
1351 ...(sourceWorkerNode.popTask() as Task<Data>),
1352 workerId: workerNode.info.id as number
1353 }
1354 if (this.shallExecuteTask(workerNodeKey)) {
1355 this.executeTask(workerNodeKey, task)
1356 } else {
1357 this.enqueueTask(workerNodeKey, task)
1358 }
1359 this.updateTaskStolenStatisticsWorkerUsage(
1360 workerNodeKey,
1361 task.name as string
1362 )
1363 }
1364 }
1365 }
1366
1367 /**
1368 * This method is the listener registered for each worker message.
1369 *
1370 * @returns The listener function to execute when a message is received from a worker.
1371 */
1372 protected workerListener (): (message: MessageValue<Response>) => void {
1373 return message => {
1374 this.checkMessageWorkerId(message)
1375 if (message.ready != null && message.taskFunctions != null) {
1376 // Worker ready response received from worker
1377 this.handleWorkerReadyResponse(message)
1378 } else if (message.taskId != null) {
1379 // Task execution response received from worker
1380 this.handleTaskExecutionResponse(message)
1381 } else if (message.taskFunctions != null) {
1382 // Task functions message received from worker
1383 this.getWorkerInfo(
1384 this.getWorkerNodeKeyByWorkerId(message.workerId)
1385 ).taskFunctions = message.taskFunctions
1386 }
1387 }
1388 }
1389
1390 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1391 if (message.ready === false) {
1392 throw new Error(`Worker ${message.workerId} failed to initialize`)
1393 }
1394 const workerInfo = this.getWorkerInfo(
1395 this.getWorkerNodeKeyByWorkerId(message.workerId)
1396 )
1397 workerInfo.ready = message.ready as boolean
1398 workerInfo.taskFunctions = message.taskFunctions
1399 if (this.ready) {
1400 this.emitter?.emit(PoolEvents.ready, this.info)
1401 }
1402 }
1403
1404 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1405 const { taskId, taskError, data } = message
1406 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1407 if (promiseResponse != null) {
1408 if (taskError != null) {
1409 this.emitter?.emit(PoolEvents.taskError, taskError)
1410 promiseResponse.reject(taskError.message)
1411 } else {
1412 promiseResponse.resolve(data as Response)
1413 }
1414 const workerNodeKey = promiseResponse.workerNodeKey
1415 this.afterTaskExecutionHook(workerNodeKey, message)
1416 this.workerChoiceStrategyContext.update(workerNodeKey)
1417 this.promiseResponseMap.delete(taskId as string)
1418 if (
1419 this.opts.enableTasksQueue === true &&
1420 this.tasksQueueSize(workerNodeKey) > 0 &&
1421 this.workerNodes[workerNodeKey].usage.tasks.executing <
1422 (this.opts.tasksQueueOptions?.concurrency as number)
1423 ) {
1424 this.executeTask(
1425 workerNodeKey,
1426 this.dequeueTask(workerNodeKey) as Task<Data>
1427 )
1428 }
1429 }
1430 }
1431
1432 private checkAndEmitTaskExecutionEvents (): void {
1433 if (this.busy) {
1434 this.emitter?.emit(PoolEvents.busy, this.info)
1435 }
1436 }
1437
1438 private checkAndEmitTaskQueuingEvents (): void {
1439 if (this.hasBackPressure()) {
1440 this.emitter?.emit(PoolEvents.backPressure, this.info)
1441 }
1442 }
1443
1444 private checkAndEmitDynamicWorkerCreationEvents (): void {
1445 if (this.type === PoolTypes.dynamic) {
1446 if (this.full) {
1447 this.emitter?.emit(PoolEvents.full, this.info)
1448 }
1449 }
1450 }
1451
1452 /**
1453 * Gets the worker information given its worker node key.
1454 *
1455 * @param workerNodeKey - The worker node key.
1456 * @returns The worker information.
1457 */
1458 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1459 return this.workerNodes[workerNodeKey].info
1460 }
1461
1462 /**
1463 * Adds the given worker in the pool worker nodes.
1464 *
1465 * @param worker - The worker.
1466 * @returns The added worker node key.
1467 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1468 */
1469 private addWorkerNode (worker: Worker): number {
1470 const workerNode = new WorkerNode<Worker, Data>(
1471 worker,
1472 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
1473 )
1474 // Flag the worker node as ready at pool startup.
1475 if (this.starting) {
1476 workerNode.info.ready = true
1477 }
1478 this.workerNodes.push(workerNode)
1479 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1480 if (workerNodeKey === -1) {
1481 throw new Error('Worker added not found in worker nodes')
1482 }
1483 return workerNodeKey
1484 }
1485
1486 /**
1487 * Removes the given worker from the pool worker nodes.
1488 *
1489 * @param worker - The worker.
1490 */
1491 private removeWorkerNode (worker: Worker): void {
1492 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1493 if (workerNodeKey !== -1) {
1494 this.workerNodes.splice(workerNodeKey, 1)
1495 this.workerChoiceStrategyContext.remove(workerNodeKey)
1496 }
1497 }
1498
1499 /** @inheritDoc */
1500 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1501 return (
1502 this.opts.enableTasksQueue === true &&
1503 this.workerNodes[workerNodeKey].hasBackPressure()
1504 )
1505 }
1506
1507 private hasBackPressure (): boolean {
1508 return (
1509 this.opts.enableTasksQueue === true &&
1510 this.workerNodes.findIndex(
1511 workerNode => !workerNode.hasBackPressure()
1512 ) === -1
1513 )
1514 }
1515
1516 /**
1517 * Executes the given task on the worker given its worker node key.
1518 *
1519 * @param workerNodeKey - The worker node key.
1520 * @param task - The task to execute.
1521 */
1522 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1523 this.beforeTaskExecutionHook(workerNodeKey, task)
1524 this.sendToWorker(workerNodeKey, task, task.transferList)
1525 this.checkAndEmitTaskExecutionEvents()
1526 }
1527
1528 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1529 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1530 this.checkAndEmitTaskQueuingEvents()
1531 return tasksQueueSize
1532 }
1533
1534 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1535 return this.workerNodes[workerNodeKey].dequeueTask()
1536 }
1537
1538 private tasksQueueSize (workerNodeKey: number): number {
1539 return this.workerNodes[workerNodeKey].tasksQueueSize()
1540 }
1541
1542 protected flushTasksQueue (workerNodeKey: number): void {
1543 while (this.tasksQueueSize(workerNodeKey) > 0) {
1544 this.executeTask(
1545 workerNodeKey,
1546 this.dequeueTask(workerNodeKey) as Task<Data>
1547 )
1548 }
1549 this.workerNodes[workerNodeKey].clearTasksQueue()
1550 }
1551
1552 private flushTasksQueues (): void {
1553 for (const [workerNodeKey] of this.workerNodes.entries()) {
1554 this.flushTasksQueue(workerNodeKey)
1555 }
1556 }
1557 }