fix: fix back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error(
110 'Cannot start a pool from a worker with the same type as the pool'
111 )
112 }
113 this.checkNumberOfWorkers(this.numberOfWorkers)
114 this.checkFilePath(this.filePath)
115 this.checkPoolOptions(this.opts)
116
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
120 this.dequeueTask = this.dequeueTask.bind(this)
121 this.checkAndEmitTaskExecutionEvents =
122 this.checkAndEmitTaskExecutionEvents.bind(this)
123 this.checkAndEmitTaskQueuingEvents =
124 this.checkAndEmitTaskQueuingEvents.bind(this)
125
126 if (this.opts.enableEvents === true) {
127 this.emitter = new PoolEmitter()
128 }
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
138
139 this.setupHook()
140
141 this.starting = true
142 this.startPool()
143 this.starting = false
144
145 this.startTimestamp = performance.now()
146 }
147
148 private checkFilePath (filePath: string): void {
149 if (
150 filePath == null ||
151 typeof filePath !== 'string' ||
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
154 throw new Error('Please specify a file with a worker implementation')
155 }
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
159 }
160
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
167 throw new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 } else if (numberOfWorkers < 0) {
171 throw new RangeError(
172 'Cannot instantiate a pool with a negative number of workers'
173 )
174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
180 if (this.type === PoolTypes.dynamic) {
181 if (max == null) {
182 throw new Error(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 } else if (max === 0) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
202 }
203 }
204
205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
267 if (
268 workerChoiceStrategyOptions.weights != null &&
269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
285 }
286
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
305 throw new Error(
306 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
307 )
308 }
309 }
310
311 private startPool (): void {
312 while (
313 this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
316 0
317 ) < this.numberOfWorkers
318 ) {
319 this.createAndSetupWorkerNode()
320 }
321 }
322
323 /** @inheritDoc */
324 public get info (): PoolInfo {
325 return {
326 version,
327 type: this.type,
328 worker: this.worker,
329 ready: this.ready,
330 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
331 minSize: this.minSize,
332 maxSize: this.maxSize,
333 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 .runTime.aggregate &&
335 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .waitTime.aggregate && { utilization: round(this.utilization) }),
337 workerNodes: this.workerNodes.length,
338 idleWorkerNodes: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 workerNode.usage.tasks.executing === 0
341 ? accumulator + 1
342 : accumulator,
343 0
344 ),
345 busyWorkerNodes: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
348 0
349 ),
350 executedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 accumulator + workerNode.usage.tasks.executed,
353 0
354 ),
355 executingTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
357 accumulator + workerNode.usage.tasks.executing,
358 0
359 ),
360 ...(this.opts.enableTasksQueue === true && {
361 queuedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.queued,
364 0
365 )
366 }),
367 ...(this.opts.enableTasksQueue === true && {
368 maxQueuedTasks: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
371 0
372 )
373 }),
374 ...(this.opts.enableTasksQueue === true && {
375 backPressure: this.hasBackPressure()
376 }),
377 failedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.failed,
380 0
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.aggregate && {
384 runTime: {
385 minimum: round(
386 Math.min(
387 ...this.workerNodes.map(
388 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
389 )
390 )
391 ),
392 maximum: round(
393 Math.max(
394 ...this.workerNodes.map(
395 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
396 )
397 )
398 ),
399 average: round(
400 this.workerNodes.reduce(
401 (accumulator, workerNode) =>
402 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
403 0
404 ) /
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.tasks?.executed ?? 0),
408 0
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .runTime.median && {
413 median: round(
414 median(
415 this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.median ?? 0
417 )
418 )
419 )
420 })
421 }
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.aggregate && {
425 waitTime: {
426 minimum: round(
427 Math.min(
428 ...this.workerNodes.map(
429 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
430 )
431 )
432 ),
433 maximum: round(
434 Math.max(
435 ...this.workerNodes.map(
436 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
437 )
438 )
439 ),
440 average: round(
441 this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 ) /
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 accumulator + (workerNode.usage.tasks?.executed ?? 0),
449 0
450 )
451 ),
452 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
453 .waitTime.median && {
454 median: round(
455 median(
456 this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.median ?? 0
458 )
459 )
460 )
461 })
462 }
463 })
464 }
465 }
466
467 /**
468 * The pool readiness boolean status.
469 */
470 private get ready (): boolean {
471 return (
472 this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 !workerNode.info.dynamic && workerNode.info.ready
475 ? accumulator + 1
476 : accumulator,
477 0
478 ) >= this.minSize
479 )
480 }
481
482 /**
483 * The approximate pool utilization.
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
488 const poolTimeCapacity =
489 (performance.now() - this.startTimestamp) * this.maxSize
490 const totalTasksRunTime = this.workerNodes.reduce(
491 (accumulator, workerNode) =>
492 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
493 0
494 )
495 const totalTasksWaitTime = this.workerNodes.reduce(
496 (accumulator, workerNode) =>
497 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
498 0
499 )
500 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
501 }
502
503 /**
504 * The pool type.
505 *
506 * If it is `'dynamic'`, it provides the `max` property.
507 */
508 protected abstract get type (): PoolType
509
510 /**
511 * The worker type.
512 */
513 protected abstract get worker (): WorkerType
514
515 /**
516 * The pool minimum size.
517 */
518 protected abstract get minSize (): number
519
520 /**
521 * The pool maximum size.
522 */
523 protected abstract get maxSize (): number
524
525 /**
526 * Checks if the worker id sent in the received message from a worker is valid.
527 *
528 * @param message - The received message.
529 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
530 */
531 private checkMessageWorkerId (message: MessageValue<Response>): void {
532 if (message.workerId == null) {
533 throw new Error('Worker message received without worker id')
534 } else if (
535 message.workerId != null &&
536 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
537 ) {
538 throw new Error(
539 `Worker message received from unknown worker '${message.workerId}'`
540 )
541 }
542 }
543
544 /**
545 * Gets the given worker its worker node key.
546 *
547 * @param worker - The worker.
548 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
549 */
550 private getWorkerNodeKeyByWorker (worker: Worker): number {
551 return this.workerNodes.findIndex(
552 (workerNode) => workerNode.worker === worker
553 )
554 }
555
556 /**
557 * Gets the worker node key given its worker id.
558 *
559 * @param workerId - The worker id.
560 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
561 */
562 private getWorkerNodeKeyByWorkerId (workerId: number): number {
563 return this.workerNodes.findIndex(
564 (workerNode) => workerNode.info.id === workerId
565 )
566 }
567
568 /** @inheritDoc */
569 public setWorkerChoiceStrategy (
570 workerChoiceStrategy: WorkerChoiceStrategy,
571 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
572 ): void {
573 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
574 this.opts.workerChoiceStrategy = workerChoiceStrategy
575 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
576 this.opts.workerChoiceStrategy
577 )
578 if (workerChoiceStrategyOptions != null) {
579 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
580 }
581 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
582 workerNode.resetUsage()
583 this.sendStatisticsMessageToWorker(workerNodeKey)
584 }
585 }
586
587 /** @inheritDoc */
588 public setWorkerChoiceStrategyOptions (
589 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
590 ): void {
591 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
592 this.opts.workerChoiceStrategyOptions = {
593 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
594 ...workerChoiceStrategyOptions
595 }
596 this.workerChoiceStrategyContext.setOptions(
597 this.opts.workerChoiceStrategyOptions
598 )
599 }
600
601 /** @inheritDoc */
602 public enableTasksQueue (
603 enable: boolean,
604 tasksQueueOptions?: TasksQueueOptions
605 ): void {
606 if (this.opts.enableTasksQueue === true && !enable) {
607 this.flushTasksQueues()
608 }
609 this.opts.enableTasksQueue = enable
610 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
611 }
612
613 /** @inheritDoc */
614 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
615 if (this.opts.enableTasksQueue === true) {
616 this.checkValidTasksQueueOptions(tasksQueueOptions)
617 this.opts.tasksQueueOptions =
618 this.buildTasksQueueOptions(tasksQueueOptions)
619 } else if (this.opts.tasksQueueOptions != null) {
620 delete this.opts.tasksQueueOptions
621 }
622 }
623
624 private buildTasksQueueOptions (
625 tasksQueueOptions: TasksQueueOptions
626 ): TasksQueueOptions {
627 return {
628 concurrency: tasksQueueOptions?.concurrency ?? 1
629 }
630 }
631
632 /**
633 * Whether the pool is full or not.
634 *
635 * The pool filling boolean status.
636 */
637 protected get full (): boolean {
638 return this.workerNodes.length >= this.maxSize
639 }
640
641 /**
642 * Whether the pool is busy or not.
643 *
644 * The pool busyness boolean status.
645 */
646 protected abstract get busy (): boolean
647
648 /**
649 * Whether worker nodes are executing concurrently their tasks quota or not.
650 *
651 * @returns Worker nodes busyness boolean status.
652 */
653 protected internalBusy (): boolean {
654 if (this.opts.enableTasksQueue === true) {
655 return (
656 this.workerNodes.findIndex(
657 (workerNode) =>
658 workerNode.info.ready &&
659 workerNode.usage.tasks.executing <
660 (this.opts.tasksQueueOptions?.concurrency as number)
661 ) === -1
662 )
663 } else {
664 return (
665 this.workerNodes.findIndex(
666 (workerNode) =>
667 workerNode.info.ready && workerNode.usage.tasks.executing === 0
668 ) === -1
669 )
670 }
671 }
672
673 /** @inheritDoc */
674 public listTaskFunctions (): string[] {
675 for (const workerNode of this.workerNodes) {
676 if (
677 Array.isArray(workerNode.info.taskFunctions) &&
678 workerNode.info.taskFunctions.length > 0
679 ) {
680 return workerNode.info.taskFunctions
681 }
682 }
683 return []
684 }
685
686 /** @inheritDoc */
687 public async execute (
688 data?: Data,
689 name?: string,
690 transferList?: TransferListItem[]
691 ): Promise<Response> {
692 return await new Promise<Response>((resolve, reject) => {
693 if (name != null && typeof name !== 'string') {
694 reject(new TypeError('name argument must be a string'))
695 }
696 if (
697 name != null &&
698 typeof name === 'string' &&
699 name.trim().length === 0
700 ) {
701 reject(new TypeError('name argument must not be an empty string'))
702 }
703 if (transferList != null && !Array.isArray(transferList)) {
704 reject(new TypeError('transferList argument must be an array'))
705 }
706 const timestamp = performance.now()
707 const workerNodeKey = this.chooseWorkerNode()
708 const workerInfo = this.getWorkerInfo(workerNodeKey)
709 if (
710 name != null &&
711 Array.isArray(workerInfo.taskFunctions) &&
712 !workerInfo.taskFunctions.includes(name)
713 ) {
714 reject(
715 new Error(`Task function '${name}' is not registered in the pool`)
716 )
717 }
718 const task: Task<Data> = {
719 name: name ?? DEFAULT_TASK_NAME,
720 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
721 data: data ?? ({} as Data),
722 transferList,
723 timestamp,
724 workerId: workerInfo.id as number,
725 taskId: randomUUID()
726 }
727 this.promiseResponseMap.set(task.taskId as string, {
728 resolve,
729 reject,
730 workerNodeKey
731 })
732 if (
733 this.opts.enableTasksQueue === false ||
734 (this.opts.enableTasksQueue === true &&
735 this.workerNodes[workerNodeKey].usage.tasks.executing <
736 (this.opts.tasksQueueOptions?.concurrency as number))
737 ) {
738 this.executeTask(workerNodeKey, task)
739 } else {
740 this.enqueueTask(workerNodeKey, task)
741 }
742 })
743 }
744
745 /** @inheritDoc */
746 public async destroy (): Promise<void> {
747 await Promise.all(
748 this.workerNodes.map(async (_, workerNodeKey) => {
749 await this.destroyWorkerNode(workerNodeKey)
750 })
751 )
752 this.emitter?.emit(PoolEvents.destroy)
753 }
754
755 protected async sendKillMessageToWorker (
756 workerNodeKey: number,
757 workerId: number
758 ): Promise<void> {
759 await new Promise<void>((resolve, reject) => {
760 this.registerWorkerMessageListener(workerNodeKey, (message) => {
761 if (message.kill === 'success') {
762 resolve()
763 } else if (message.kill === 'failure') {
764 reject(new Error(`Worker ${workerId} kill message handling failed`))
765 }
766 })
767 this.sendToWorker(workerNodeKey, { kill: true, workerId })
768 })
769 }
770
771 /**
772 * Terminates the worker node given its worker node key.
773 *
774 * @param workerNodeKey - The worker node key.
775 */
776 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
777
778 /**
779 * Setup hook to execute code before worker nodes are created in the abstract constructor.
780 * Can be overridden.
781 *
782 * @virtual
783 */
784 protected setupHook (): void {
785 // Intentionally empty
786 }
787
788 /**
789 * Should return whether the worker is the main worker or not.
790 */
791 protected abstract isMain (): boolean
792
793 /**
794 * Hook executed before the worker task execution.
795 * Can be overridden.
796 *
797 * @param workerNodeKey - The worker node key.
798 * @param task - The task to execute.
799 */
800 protected beforeTaskExecutionHook (
801 workerNodeKey: number,
802 task: Task<Data>
803 ): void {
804 const workerUsage = this.workerNodes[workerNodeKey].usage
805 ++workerUsage.tasks.executing
806 this.updateWaitTimeWorkerUsage(workerUsage, task)
807 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
808 const taskFunctionWorkerUsage = this.workerNodes[
809 workerNodeKey
810 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
811 ++taskFunctionWorkerUsage.tasks.executing
812 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
813 }
814 }
815
816 /**
817 * Hook executed after the worker task execution.
818 * Can be overridden.
819 *
820 * @param workerNodeKey - The worker node key.
821 * @param message - The received message.
822 */
823 protected afterTaskExecutionHook (
824 workerNodeKey: number,
825 message: MessageValue<Response>
826 ): void {
827 const workerUsage = this.workerNodes[workerNodeKey].usage
828 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
829 this.updateRunTimeWorkerUsage(workerUsage, message)
830 this.updateEluWorkerUsage(workerUsage, message)
831 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
832 const taskFunctionWorkerUsage = this.workerNodes[
833 workerNodeKey
834 ].getTaskFunctionWorkerUsage(
835 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
836 ) as WorkerUsage
837 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
838 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
839 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
840 }
841 }
842
843 /**
844 * Whether the worker node shall update its task function worker usage or not.
845 *
846 * @param workerNodeKey - The worker node key.
847 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
848 */
849 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
850 const workerInfo = this.getWorkerInfo(workerNodeKey)
851 return (
852 Array.isArray(workerInfo.taskFunctions) &&
853 workerInfo.taskFunctions.length > 2
854 )
855 }
856
857 private updateTaskStatisticsWorkerUsage (
858 workerUsage: WorkerUsage,
859 message: MessageValue<Response>
860 ): void {
861 const workerTaskStatistics = workerUsage.tasks
862 if (
863 workerTaskStatistics.executing != null &&
864 workerTaskStatistics.executing > 0
865 ) {
866 --workerTaskStatistics.executing
867 } else if (
868 workerTaskStatistics.executing != null &&
869 workerTaskStatistics.executing < 0
870 ) {
871 throw new Error(
872 'Worker usage statistic for tasks executing cannot be negative'
873 )
874 }
875 if (message.taskError == null) {
876 ++workerTaskStatistics.executed
877 } else {
878 ++workerTaskStatistics.failed
879 }
880 }
881
882 private updateRunTimeWorkerUsage (
883 workerUsage: WorkerUsage,
884 message: MessageValue<Response>
885 ): void {
886 updateMeasurementStatistics(
887 workerUsage.runTime,
888 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
889 message.taskPerformance?.runTime ?? 0,
890 workerUsage.tasks.executed
891 )
892 }
893
894 private updateWaitTimeWorkerUsage (
895 workerUsage: WorkerUsage,
896 task: Task<Data>
897 ): void {
898 const timestamp = performance.now()
899 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
900 updateMeasurementStatistics(
901 workerUsage.waitTime,
902 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
903 taskWaitTime,
904 workerUsage.tasks.executed
905 )
906 }
907
908 private updateEluWorkerUsage (
909 workerUsage: WorkerUsage,
910 message: MessageValue<Response>
911 ): void {
912 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
913 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
914 updateMeasurementStatistics(
915 workerUsage.elu.active,
916 eluTaskStatisticsRequirements,
917 message.taskPerformance?.elu?.active ?? 0,
918 workerUsage.tasks.executed
919 )
920 updateMeasurementStatistics(
921 workerUsage.elu.idle,
922 eluTaskStatisticsRequirements,
923 message.taskPerformance?.elu?.idle ?? 0,
924 workerUsage.tasks.executed
925 )
926 if (eluTaskStatisticsRequirements.aggregate) {
927 if (message.taskPerformance?.elu != null) {
928 if (workerUsage.elu.utilization != null) {
929 workerUsage.elu.utilization =
930 (workerUsage.elu.utilization +
931 message.taskPerformance.elu.utilization) /
932 2
933 } else {
934 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
935 }
936 }
937 }
938 }
939
940 /**
941 * Chooses a worker node for the next task.
942 *
943 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
944 *
945 * @returns The chosen worker node key
946 */
947 private chooseWorkerNode (): number {
948 if (this.shallCreateDynamicWorker()) {
949 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
950 if (
951 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
952 ) {
953 return workerNodeKey
954 }
955 }
956 return this.workerChoiceStrategyContext.execute()
957 }
958
959 /**
960 * Conditions for dynamic worker creation.
961 *
962 * @returns Whether to create a dynamic worker or not.
963 */
964 private shallCreateDynamicWorker (): boolean {
965 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
966 }
967
968 /**
969 * Sends a message to worker given its worker node key.
970 *
971 * @param workerNodeKey - The worker node key.
972 * @param message - The message.
973 * @param transferList - The optional array of transferable objects.
974 */
975 protected abstract sendToWorker (
976 workerNodeKey: number,
977 message: MessageValue<Data>,
978 transferList?: TransferListItem[]
979 ): void
980
981 /**
982 * Creates a new worker.
983 *
984 * @returns Newly created worker.
985 */
986 protected abstract createWorker (): Worker
987
988 /**
989 * Creates a new, completely set up worker node.
990 *
991 * @returns New, completely set up worker node key.
992 */
993 protected createAndSetupWorkerNode (): number {
994 const worker = this.createWorker()
995
996 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
997 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
998 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
999 worker.on('error', (error) => {
1000 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1001 const workerInfo = this.getWorkerInfo(workerNodeKey)
1002 workerInfo.ready = false
1003 this.workerNodes[workerNodeKey].closeChannel()
1004 this.emitter?.emit(PoolEvents.error, error)
1005 if (this.opts.restartWorkerOnError === true && !this.starting) {
1006 if (workerInfo.dynamic) {
1007 this.createAndSetupDynamicWorkerNode()
1008 } else {
1009 this.createAndSetupWorkerNode()
1010 }
1011 }
1012 if (this.opts.enableTasksQueue === true) {
1013 this.redistributeQueuedTasks(workerNodeKey)
1014 }
1015 })
1016 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1017 worker.once('exit', () => {
1018 this.removeWorkerNode(worker)
1019 })
1020
1021 const workerNodeKey = this.addWorkerNode(worker)
1022
1023 this.afterWorkerNodeSetup(workerNodeKey)
1024
1025 return workerNodeKey
1026 }
1027
1028 /**
1029 * Creates a new, completely set up dynamic worker node.
1030 *
1031 * @returns New, completely set up dynamic worker node key.
1032 */
1033 protected createAndSetupDynamicWorkerNode (): number {
1034 const workerNodeKey = this.createAndSetupWorkerNode()
1035 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1036 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1037 message.workerId
1038 )
1039 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1040 // Kill message received from worker
1041 if (
1042 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1043 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1044 ((this.opts.enableTasksQueue === false &&
1045 workerUsage.tasks.executing === 0) ||
1046 (this.opts.enableTasksQueue === true &&
1047 workerUsage.tasks.executing === 0 &&
1048 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1049 ) {
1050 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1051 this.emitter?.emit(PoolEvents.error, error)
1052 })
1053 }
1054 })
1055 const workerInfo = this.getWorkerInfo(workerNodeKey)
1056 this.sendToWorker(workerNodeKey, {
1057 checkActive: true,
1058 workerId: workerInfo.id as number
1059 })
1060 workerInfo.dynamic = true
1061 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1062 workerInfo.ready = true
1063 }
1064 return workerNodeKey
1065 }
1066
1067 /**
1068 * Registers a listener callback on the worker given its worker node key.
1069 *
1070 * @param workerNodeKey - The worker node key.
1071 * @param listener - The message listener callback.
1072 */
1073 protected abstract registerWorkerMessageListener<
1074 Message extends Data | Response
1075 >(
1076 workerNodeKey: number,
1077 listener: (message: MessageValue<Message>) => void
1078 ): void
1079
1080 /**
1081 * Method hooked up after a worker node has been newly created.
1082 * Can be overridden.
1083 *
1084 * @param workerNodeKey - The newly created worker node key.
1085 */
1086 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1087 // Listen to worker messages.
1088 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1089 // Send the startup message to worker.
1090 this.sendStartupMessageToWorker(workerNodeKey)
1091 // Send the statistics message to worker.
1092 this.sendStatisticsMessageToWorker(workerNodeKey)
1093 }
1094
1095 /**
1096 * Sends the startup message to worker given its worker node key.
1097 *
1098 * @param workerNodeKey - The worker node key.
1099 */
1100 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1101
1102 /**
1103 * Sends the statistics message to worker given its worker node key.
1104 *
1105 * @param workerNodeKey - The worker node key.
1106 */
1107 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1108 this.sendToWorker(workerNodeKey, {
1109 statistics: {
1110 runTime:
1111 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1112 .runTime.aggregate,
1113 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1114 .elu.aggregate
1115 },
1116 workerId: this.getWorkerInfo(workerNodeKey).id as number
1117 })
1118 }
1119
1120 private redistributeQueuedTasks (workerNodeKey: number): void {
1121 while (this.tasksQueueSize(workerNodeKey) > 0) {
1122 let targetWorkerNodeKey: number = workerNodeKey
1123 let minQueuedTasks = Infinity
1124 let executeTask = false
1125 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1126 const workerInfo = this.getWorkerInfo(workerNodeId)
1127 if (
1128 workerNodeId !== workerNodeKey &&
1129 workerInfo.ready &&
1130 workerNode.usage.tasks.queued === 0
1131 ) {
1132 if (
1133 this.workerNodes[workerNodeId].usage.tasks.executing <
1134 (this.opts.tasksQueueOptions?.concurrency as number)
1135 ) {
1136 executeTask = true
1137 }
1138 targetWorkerNodeKey = workerNodeId
1139 break
1140 }
1141 if (
1142 workerNodeId !== workerNodeKey &&
1143 workerInfo.ready &&
1144 workerNode.usage.tasks.queued < minQueuedTasks
1145 ) {
1146 minQueuedTasks = workerNode.usage.tasks.queued
1147 targetWorkerNodeKey = workerNodeId
1148 }
1149 }
1150 if (executeTask) {
1151 this.executeTask(
1152 targetWorkerNodeKey,
1153 this.dequeueTask(workerNodeKey) as Task<Data>
1154 )
1155 } else {
1156 this.enqueueTask(
1157 targetWorkerNodeKey,
1158 this.dequeueTask(workerNodeKey) as Task<Data>
1159 )
1160 }
1161 }
1162 }
1163
1164 /**
1165 * This method is the listener registered for each worker message.
1166 *
1167 * @returns The listener function to execute when a message is received from a worker.
1168 */
1169 protected workerListener (): (message: MessageValue<Response>) => void {
1170 return (message) => {
1171 this.checkMessageWorkerId(message)
1172 if (message.ready != null && message.taskFunctions != null) {
1173 // Worker ready response received from worker
1174 this.handleWorkerReadyResponse(message)
1175 } else if (message.taskId != null) {
1176 // Task execution response received from worker
1177 this.handleTaskExecutionResponse(message)
1178 } else if (message.taskFunctions != null) {
1179 // Task functions message received from worker
1180 this.getWorkerInfo(
1181 this.getWorkerNodeKeyByWorkerId(message.workerId)
1182 ).taskFunctions = message.taskFunctions
1183 }
1184 }
1185 }
1186
1187 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1188 if (message.ready === false) {
1189 throw new Error(`Worker ${message.workerId} failed to initialize`)
1190 }
1191 const workerInfo = this.getWorkerInfo(
1192 this.getWorkerNodeKeyByWorkerId(message.workerId)
1193 )
1194 workerInfo.ready = message.ready as boolean
1195 workerInfo.taskFunctions = message.taskFunctions
1196 if (this.emitter != null && this.ready) {
1197 this.emitter.emit(PoolEvents.ready, this.info)
1198 }
1199 }
1200
1201 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1202 const { taskId, taskError, data } = message
1203 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1204 if (promiseResponse != null) {
1205 if (taskError != null) {
1206 this.emitter?.emit(PoolEvents.taskError, taskError)
1207 promiseResponse.reject(taskError.message)
1208 } else {
1209 promiseResponse.resolve(data as Response)
1210 }
1211 const workerNodeKey = promiseResponse.workerNodeKey
1212 this.afterTaskExecutionHook(workerNodeKey, message)
1213 this.promiseResponseMap.delete(taskId as string)
1214 if (
1215 this.opts.enableTasksQueue === true &&
1216 this.tasksQueueSize(workerNodeKey) > 0 &&
1217 this.workerNodes[workerNodeKey].usage.tasks.executing <
1218 (this.opts.tasksQueueOptions?.concurrency as number)
1219 ) {
1220 this.executeTask(
1221 workerNodeKey,
1222 this.dequeueTask(workerNodeKey) as Task<Data>
1223 )
1224 }
1225 this.workerChoiceStrategyContext.update(workerNodeKey)
1226 }
1227 }
1228
1229 private checkAndEmitTaskExecutionEvents (): void {
1230 if (this.emitter != null) {
1231 if (this.busy) {
1232 this.emitter.emit(PoolEvents.busy, this.info)
1233 }
1234 if (this.type === PoolTypes.dynamic && this.full) {
1235 this.emitter.emit(PoolEvents.full, this.info)
1236 }
1237 }
1238 }
1239
1240 private checkAndEmitTaskQueuingEvents (): void {
1241 if (this.hasBackPressure()) {
1242 this.emitter?.emit(PoolEvents.backPressure, this.info)
1243 }
1244 }
1245
1246 /**
1247 * Gets the worker information given its worker node key.
1248 *
1249 * @param workerNodeKey - The worker node key.
1250 * @returns The worker information.
1251 */
1252 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1253 return this.workerNodes[workerNodeKey].info
1254 }
1255
1256 /**
1257 * Adds the given worker in the pool worker nodes.
1258 *
1259 * @param worker - The worker.
1260 * @returns The added worker node key.
1261 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1262 */
1263 private addWorkerNode (worker: Worker): number {
1264 const workerNode = new WorkerNode<Worker, Data>(
1265 worker,
1266 this.worker,
1267 this.maxSize
1268 )
1269 // Flag the worker node as ready at pool startup.
1270 if (this.starting) {
1271 workerNode.info.ready = true
1272 }
1273 this.workerNodes.push(workerNode)
1274 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1275 if (workerNodeKey === -1) {
1276 throw new Error('Worker node added not found')
1277 }
1278 return workerNodeKey
1279 }
1280
1281 /**
1282 * Removes the given worker from the pool worker nodes.
1283 *
1284 * @param worker - The worker.
1285 */
1286 private removeWorkerNode (worker: Worker): void {
1287 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1288 if (workerNodeKey !== -1) {
1289 this.workerNodes.splice(workerNodeKey, 1)
1290 this.workerChoiceStrategyContext.remove(workerNodeKey)
1291 }
1292 }
1293
1294 /** @inheritDoc */
1295 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1296 return (
1297 this.opts.enableTasksQueue === true &&
1298 this.workerNodes[workerNodeKey].hasBackPressure()
1299 )
1300 }
1301
1302 private hasBackPressure (): boolean {
1303 return (
1304 this.opts.enableTasksQueue === true &&
1305 this.workerNodes.findIndex(
1306 (workerNode) => !workerNode.hasBackPressure()
1307 ) === -1
1308 )
1309 }
1310
1311 /**
1312 * Executes the given task on the worker given its worker node key.
1313 *
1314 * @param workerNodeKey - The worker node key.
1315 * @param task - The task to execute.
1316 */
1317 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1318 this.beforeTaskExecutionHook(workerNodeKey, task)
1319 this.sendToWorker(workerNodeKey, task, task.transferList)
1320 this.checkAndEmitTaskExecutionEvents()
1321 }
1322
1323 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1324 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1325 this.checkAndEmitTaskQueuingEvents()
1326 return tasksQueueSize
1327 }
1328
1329 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1330 return this.workerNodes[workerNodeKey].dequeueTask()
1331 }
1332
1333 private tasksQueueSize (workerNodeKey: number): number {
1334 return this.workerNodes[workerNodeKey].tasksQueueSize()
1335 }
1336
1337 protected flushTasksQueue (workerNodeKey: number): void {
1338 while (this.tasksQueueSize(workerNodeKey) > 0) {
1339 this.executeTask(
1340 workerNodeKey,
1341 this.dequeueTask(workerNodeKey) as Task<Data>
1342 )
1343 }
1344 this.workerNodes[workerNodeKey].clearTasksQueue()
1345 }
1346
1347 private flushTasksQueues (): void {
1348 for (const [workerNodeKey] of this.workerNodes.entries()) {
1349 this.flushTasksQueue(workerNodeKey)
1350 }
1351 }
1352 }