fix: fix worker choice strategy retries mechanism on some edge cases
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * The start timestamp of the pool.
98 */
99 private readonly startTimestamp
100
101 /**
102 * Constructs a new poolifier pool.
103 *
104 * @param numberOfWorkers - Number of workers that this pool should manage.
105 * @param filePath - Path to the worker file.
106 * @param opts - Options for the pool.
107 */
108 public constructor (
109 protected readonly numberOfWorkers: number,
110 protected readonly filePath: string,
111 protected readonly opts: PoolOptions<Worker>
112 ) {
113 if (!this.isMain()) {
114 throw new Error(
115 'Cannot start a pool from a worker with the same type as the pool'
116 )
117 }
118 this.checkNumberOfWorkers(this.numberOfWorkers)
119 this.checkFilePath(this.filePath)
120 this.checkPoolOptions(this.opts)
121
122 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
123 this.executeTask = this.executeTask.bind(this)
124 this.enqueueTask = this.enqueueTask.bind(this)
125
126 if (this.opts.enableEvents === true) {
127 this.emitter = new PoolEmitter()
128 }
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
138
139 this.setupHook()
140
141 this.starting = true
142 this.startPool()
143 this.starting = false
144
145 this.startTimestamp = performance.now()
146 }
147
148 private checkFilePath (filePath: string): void {
149 if (
150 filePath == null ||
151 typeof filePath !== 'string' ||
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
154 throw new Error('Please specify a file with a worker implementation')
155 }
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
159 }
160
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
167 throw new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 } else if (numberOfWorkers < 0) {
171 throw new RangeError(
172 'Cannot instantiate a pool with a negative number of workers'
173 )
174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
180 if (this.type === PoolTypes.dynamic) {
181 if (max == null) {
182 throw new Error(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 } else if (max === 0) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
202 }
203 }
204
205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
267 if (
268 workerChoiceStrategyOptions.weights != null &&
269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
285 }
286
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
305 throw new Error(
306 `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
307 )
308 }
309 }
310
311 private startPool (): void {
312 while (
313 this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
316 0
317 ) < this.numberOfWorkers
318 ) {
319 this.createAndSetupWorkerNode()
320 }
321 }
322
323 /** @inheritDoc */
324 public get info (): PoolInfo {
325 return {
326 version,
327 type: this.type,
328 worker: this.worker,
329 ready: this.ready,
330 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
331 minSize: this.minSize,
332 maxSize: this.maxSize,
333 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 .runTime.aggregate &&
335 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .waitTime.aggregate && { utilization: round(this.utilization) }),
337 workerNodes: this.workerNodes.length,
338 idleWorkerNodes: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 workerNode.usage.tasks.executing === 0
341 ? accumulator + 1
342 : accumulator,
343 0
344 ),
345 busyWorkerNodes: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
348 0
349 ),
350 executedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 accumulator + workerNode.usage.tasks.executed,
353 0
354 ),
355 executingTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
357 accumulator + workerNode.usage.tasks.executing,
358 0
359 ),
360 ...(this.opts.enableTasksQueue === true && {
361 queuedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.queued,
364 0
365 )
366 }),
367 ...(this.opts.enableTasksQueue === true && {
368 maxQueuedTasks: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
371 0
372 )
373 }),
374 ...(this.opts.enableTasksQueue === true && {
375 backPressure: this.hasBackPressure()
376 }),
377 failedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.failed,
380 0
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.aggregate && {
384 runTime: {
385 minimum: round(
386 Math.min(
387 ...this.workerNodes.map(
388 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
389 )
390 )
391 ),
392 maximum: round(
393 Math.max(
394 ...this.workerNodes.map(
395 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
396 )
397 )
398 ),
399 average: round(
400 this.workerNodes.reduce(
401 (accumulator, workerNode) =>
402 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
403 0
404 ) /
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.tasks?.executed ?? 0),
408 0
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .runTime.median && {
413 median: round(
414 median(
415 this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.median ?? 0
417 )
418 )
419 )
420 })
421 }
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.aggregate && {
425 waitTime: {
426 minimum: round(
427 Math.min(
428 ...this.workerNodes.map(
429 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
430 )
431 )
432 ),
433 maximum: round(
434 Math.max(
435 ...this.workerNodes.map(
436 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
437 )
438 )
439 ),
440 average: round(
441 this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 ) /
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 accumulator + (workerNode.usage.tasks?.executed ?? 0),
449 0
450 )
451 ),
452 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
453 .waitTime.median && {
454 median: round(
455 median(
456 this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.median ?? 0
458 )
459 )
460 )
461 })
462 }
463 })
464 }
465 }
466
467 /**
468 * The pool readiness boolean status.
469 */
470 private get ready (): boolean {
471 return (
472 this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 !workerNode.info.dynamic && workerNode.info.ready
475 ? accumulator + 1
476 : accumulator,
477 0
478 ) >= this.minSize
479 )
480 }
481
482 /**
483 * The approximate pool utilization.
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
488 const poolTimeCapacity =
489 (performance.now() - this.startTimestamp) * this.maxSize
490 const totalTasksRunTime = this.workerNodes.reduce(
491 (accumulator, workerNode) =>
492 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
493 0
494 )
495 const totalTasksWaitTime = this.workerNodes.reduce(
496 (accumulator, workerNode) =>
497 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
498 0
499 )
500 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
501 }
502
503 /**
504 * The pool type.
505 *
506 * If it is `'dynamic'`, it provides the `max` property.
507 */
508 protected abstract get type (): PoolType
509
510 /**
511 * The worker type.
512 */
513 protected abstract get worker (): WorkerType
514
515 /**
516 * The pool minimum size.
517 */
518 protected get minSize (): number {
519 return this.numberOfWorkers
520 }
521
522 /**
523 * The pool maximum size.
524 */
525 protected get maxSize (): number {
526 return this.max ?? this.numberOfWorkers
527 }
528
529 /**
530 * Checks if the worker id sent in the received message from a worker is valid.
531 *
532 * @param message - The received message.
533 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
534 */
535 private checkMessageWorkerId (message: MessageValue<Response>): void {
536 if (message.workerId == null) {
537 throw new Error('Worker message received without worker id')
538 } else if (
539 message.workerId != null &&
540 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
541 ) {
542 throw new Error(
543 `Worker message received from unknown worker '${message.workerId}'`
544 )
545 }
546 }
547
548 /**
549 * Gets the given worker its worker node key.
550 *
551 * @param worker - The worker.
552 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
553 */
554 private getWorkerNodeKeyByWorker (worker: Worker): number {
555 return this.workerNodes.findIndex(
556 (workerNode) => workerNode.worker === worker
557 )
558 }
559
560 /**
561 * Gets the worker node key given its worker id.
562 *
563 * @param workerId - The worker id.
564 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
565 */
566 private getWorkerNodeKeyByWorkerId (workerId: number): number {
567 return this.workerNodes.findIndex(
568 (workerNode) => workerNode.info.id === workerId
569 )
570 }
571
572 /** @inheritDoc */
573 public setWorkerChoiceStrategy (
574 workerChoiceStrategy: WorkerChoiceStrategy,
575 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
576 ): void {
577 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
578 this.opts.workerChoiceStrategy = workerChoiceStrategy
579 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
580 this.opts.workerChoiceStrategy
581 )
582 if (workerChoiceStrategyOptions != null) {
583 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
584 }
585 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
586 workerNode.resetUsage()
587 this.sendStatisticsMessageToWorker(workerNodeKey)
588 }
589 }
590
591 /** @inheritDoc */
592 public setWorkerChoiceStrategyOptions (
593 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
594 ): void {
595 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
596 this.opts.workerChoiceStrategyOptions = {
597 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
598 ...workerChoiceStrategyOptions
599 }
600 this.workerChoiceStrategyContext.setOptions(
601 this.opts.workerChoiceStrategyOptions
602 )
603 }
604
605 /** @inheritDoc */
606 public enableTasksQueue (
607 enable: boolean,
608 tasksQueueOptions?: TasksQueueOptions
609 ): void {
610 if (this.opts.enableTasksQueue === true && !enable) {
611 this.flushTasksQueues()
612 }
613 this.opts.enableTasksQueue = enable
614 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
615 }
616
617 /** @inheritDoc */
618 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
619 if (this.opts.enableTasksQueue === true) {
620 this.checkValidTasksQueueOptions(tasksQueueOptions)
621 this.opts.tasksQueueOptions =
622 this.buildTasksQueueOptions(tasksQueueOptions)
623 } else if (this.opts.tasksQueueOptions != null) {
624 delete this.opts.tasksQueueOptions
625 }
626 }
627
628 private buildTasksQueueOptions (
629 tasksQueueOptions: TasksQueueOptions
630 ): TasksQueueOptions {
631 return {
632 concurrency: tasksQueueOptions?.concurrency ?? 1
633 }
634 }
635
636 /**
637 * Whether the pool is full or not.
638 *
639 * The pool filling boolean status.
640 */
641 protected get full (): boolean {
642 return this.workerNodes.length >= this.maxSize
643 }
644
645 /**
646 * Whether the pool is busy or not.
647 *
648 * The pool busyness boolean status.
649 */
650 protected abstract get busy (): boolean
651
652 /**
653 * Whether worker nodes are executing concurrently their tasks quota or not.
654 *
655 * @returns Worker nodes busyness boolean status.
656 */
657 protected internalBusy (): boolean {
658 if (this.opts.enableTasksQueue === true) {
659 return (
660 this.workerNodes.findIndex(
661 (workerNode) =>
662 workerNode.info.ready &&
663 workerNode.usage.tasks.executing <
664 (this.opts.tasksQueueOptions?.concurrency as number)
665 ) === -1
666 )
667 } else {
668 return (
669 this.workerNodes.findIndex(
670 (workerNode) =>
671 workerNode.info.ready && workerNode.usage.tasks.executing === 0
672 ) === -1
673 )
674 }
675 }
676
677 /** @inheritDoc */
678 public listTaskFunctions (): string[] {
679 for (const workerNode of this.workerNodes) {
680 if (
681 Array.isArray(workerNode.info.taskFunctions) &&
682 workerNode.info.taskFunctions.length > 0
683 ) {
684 return workerNode.info.taskFunctions
685 }
686 }
687 return []
688 }
689
690 /** @inheritDoc */
691 public async execute (
692 data?: Data,
693 name?: string,
694 transferList?: TransferListItem[]
695 ): Promise<Response> {
696 return await new Promise<Response>((resolve, reject) => {
697 if (name != null && typeof name !== 'string') {
698 reject(new TypeError('name argument must be a string'))
699 }
700 if (
701 name != null &&
702 typeof name === 'string' &&
703 name.trim().length === 0
704 ) {
705 reject(new TypeError('name argument must not be an empty string'))
706 }
707 if (transferList != null && !Array.isArray(transferList)) {
708 reject(new TypeError('transferList argument must be an array'))
709 }
710 const timestamp = performance.now()
711 const workerNodeKey = this.chooseWorkerNode()
712 const workerInfo = this.getWorkerInfo(workerNodeKey)
713 if (
714 name != null &&
715 Array.isArray(workerInfo.taskFunctions) &&
716 !workerInfo.taskFunctions.includes(name)
717 ) {
718 reject(
719 new Error(`Task function '${name}' is not registered in the pool`)
720 )
721 }
722 const task: Task<Data> = {
723 name: name ?? DEFAULT_TASK_NAME,
724 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
725 data: data ?? ({} as Data),
726 transferList,
727 timestamp,
728 workerId: workerInfo.id as number,
729 taskId: randomUUID()
730 }
731 this.promiseResponseMap.set(task.taskId as string, {
732 resolve,
733 reject,
734 workerNodeKey
735 })
736 if (
737 this.opts.enableTasksQueue === false ||
738 (this.opts.enableTasksQueue === true &&
739 this.workerNodes[workerNodeKey].usage.tasks.executing <
740 (this.opts.tasksQueueOptions?.concurrency as number))
741 ) {
742 this.executeTask(workerNodeKey, task)
743 } else {
744 this.enqueueTask(workerNodeKey, task)
745 }
746 })
747 }
748
749 /** @inheritDoc */
750 public async destroy (): Promise<void> {
751 await Promise.all(
752 this.workerNodes.map(async (_, workerNodeKey) => {
753 await this.destroyWorkerNode(workerNodeKey)
754 })
755 )
756 this.emitter?.emit(PoolEvents.destroy, this.info)
757 }
758
759 protected async sendKillMessageToWorker (
760 workerNodeKey: number,
761 workerId: number
762 ): Promise<void> {
763 await new Promise<void>((resolve, reject) => {
764 this.registerWorkerMessageListener(workerNodeKey, (message) => {
765 if (message.kill === 'success') {
766 resolve()
767 } else if (message.kill === 'failure') {
768 reject(new Error(`Worker ${workerId} kill message handling failed`))
769 }
770 })
771 this.sendToWorker(workerNodeKey, { kill: true, workerId })
772 })
773 }
774
775 /**
776 * Terminates the worker node given its worker node key.
777 *
778 * @param workerNodeKey - The worker node key.
779 */
780 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
781
782 /**
783 * Setup hook to execute code before worker nodes are created in the abstract constructor.
784 * Can be overridden.
785 *
786 * @virtual
787 */
788 protected setupHook (): void {
789 // Intentionally empty
790 }
791
792 /**
793 * Should return whether the worker is the main worker or not.
794 */
795 protected abstract isMain (): boolean
796
797 /**
798 * Hook executed before the worker task execution.
799 * Can be overridden.
800 *
801 * @param workerNodeKey - The worker node key.
802 * @param task - The task to execute.
803 */
804 protected beforeTaskExecutionHook (
805 workerNodeKey: number,
806 task: Task<Data>
807 ): void {
808 const workerUsage = this.workerNodes[workerNodeKey].usage
809 ++workerUsage.tasks.executing
810 this.updateWaitTimeWorkerUsage(workerUsage, task)
811 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
812 const taskFunctionWorkerUsage = this.workerNodes[
813 workerNodeKey
814 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
815 ++taskFunctionWorkerUsage.tasks.executing
816 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
817 }
818 }
819
820 /**
821 * Hook executed after the worker task execution.
822 * Can be overridden.
823 *
824 * @param workerNodeKey - The worker node key.
825 * @param message - The received message.
826 */
827 protected afterTaskExecutionHook (
828 workerNodeKey: number,
829 message: MessageValue<Response>
830 ): void {
831 const workerUsage = this.workerNodes[workerNodeKey].usage
832 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
833 this.updateRunTimeWorkerUsage(workerUsage, message)
834 this.updateEluWorkerUsage(workerUsage, message)
835 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
836 const taskFunctionWorkerUsage = this.workerNodes[
837 workerNodeKey
838 ].getTaskFunctionWorkerUsage(
839 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
840 ) as WorkerUsage
841 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
842 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
843 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
844 }
845 }
846
847 /**
848 * Whether the worker node shall update its task function worker usage or not.
849 *
850 * @param workerNodeKey - The worker node key.
851 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
852 */
853 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
854 const workerInfo = this.getWorkerInfo(workerNodeKey)
855 return (
856 Array.isArray(workerInfo.taskFunctions) &&
857 workerInfo.taskFunctions.length > 2
858 )
859 }
860
861 private updateTaskStatisticsWorkerUsage (
862 workerUsage: WorkerUsage,
863 message: MessageValue<Response>
864 ): void {
865 const workerTaskStatistics = workerUsage.tasks
866 if (
867 workerTaskStatistics.executing != null &&
868 workerTaskStatistics.executing > 0
869 ) {
870 --workerTaskStatistics.executing
871 } else if (
872 workerTaskStatistics.executing != null &&
873 workerTaskStatistics.executing < 0
874 ) {
875 throw new Error(
876 'Worker usage statistic for tasks executing cannot be negative'
877 )
878 }
879 if (message.taskError == null) {
880 ++workerTaskStatistics.executed
881 } else {
882 ++workerTaskStatistics.failed
883 }
884 }
885
886 private updateRunTimeWorkerUsage (
887 workerUsage: WorkerUsage,
888 message: MessageValue<Response>
889 ): void {
890 updateMeasurementStatistics(
891 workerUsage.runTime,
892 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
893 message.taskPerformance?.runTime ?? 0,
894 workerUsage.tasks.executed
895 )
896 }
897
898 private updateWaitTimeWorkerUsage (
899 workerUsage: WorkerUsage,
900 task: Task<Data>
901 ): void {
902 const timestamp = performance.now()
903 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
904 updateMeasurementStatistics(
905 workerUsage.waitTime,
906 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
907 taskWaitTime,
908 workerUsage.tasks.executed
909 )
910 }
911
912 private updateEluWorkerUsage (
913 workerUsage: WorkerUsage,
914 message: MessageValue<Response>
915 ): void {
916 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
917 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
918 updateMeasurementStatistics(
919 workerUsage.elu.active,
920 eluTaskStatisticsRequirements,
921 message.taskPerformance?.elu?.active ?? 0,
922 workerUsage.tasks.executed
923 )
924 updateMeasurementStatistics(
925 workerUsage.elu.idle,
926 eluTaskStatisticsRequirements,
927 message.taskPerformance?.elu?.idle ?? 0,
928 workerUsage.tasks.executed
929 )
930 if (eluTaskStatisticsRequirements.aggregate) {
931 if (message.taskPerformance?.elu != null) {
932 if (workerUsage.elu.utilization != null) {
933 workerUsage.elu.utilization =
934 (workerUsage.elu.utilization +
935 message.taskPerformance.elu.utilization) /
936 2
937 } else {
938 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
939 }
940 }
941 }
942 }
943
944 /**
945 * Chooses a worker node for the next task.
946 *
947 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
948 *
949 * @returns The chosen worker node key
950 */
951 private chooseWorkerNode (): number {
952 if (this.shallCreateDynamicWorker()) {
953 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
954 if (
955 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
956 ) {
957 return workerNodeKey
958 }
959 }
960 return this.workerChoiceStrategyContext.execute()
961 }
962
963 /**
964 * Conditions for dynamic worker creation.
965 *
966 * @returns Whether to create a dynamic worker or not.
967 */
968 private shallCreateDynamicWorker (): boolean {
969 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
970 }
971
972 /**
973 * Sends a message to worker given its worker node key.
974 *
975 * @param workerNodeKey - The worker node key.
976 * @param message - The message.
977 * @param transferList - The optional array of transferable objects.
978 */
979 protected abstract sendToWorker (
980 workerNodeKey: number,
981 message: MessageValue<Data>,
982 transferList?: TransferListItem[]
983 ): void
984
985 /**
986 * Creates a new worker.
987 *
988 * @returns Newly created worker.
989 */
990 protected abstract createWorker (): Worker
991
992 /**
993 * Creates a new, completely set up worker node.
994 *
995 * @returns New, completely set up worker node key.
996 */
997 protected createAndSetupWorkerNode (): number {
998 const worker = this.createWorker()
999
1000 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1001 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1002 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1003 worker.on('error', (error) => {
1004 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1005 const workerInfo = this.getWorkerInfo(workerNodeKey)
1006 workerInfo.ready = false
1007 this.workerNodes[workerNodeKey].closeChannel()
1008 this.emitter?.emit(PoolEvents.error, error)
1009 if (this.opts.restartWorkerOnError === true && !this.starting) {
1010 if (workerInfo.dynamic) {
1011 this.createAndSetupDynamicWorkerNode()
1012 } else {
1013 this.createAndSetupWorkerNode()
1014 }
1015 }
1016 if (this.opts.enableTasksQueue === true) {
1017 this.redistributeQueuedTasks(workerNodeKey)
1018 }
1019 })
1020 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1021 worker.once('exit', () => {
1022 this.removeWorkerNode(worker)
1023 })
1024
1025 const workerNodeKey = this.addWorkerNode(worker)
1026
1027 this.afterWorkerNodeSetup(workerNodeKey)
1028
1029 return workerNodeKey
1030 }
1031
1032 /**
1033 * Creates a new, completely set up dynamic worker node.
1034 *
1035 * @returns New, completely set up dynamic worker node key.
1036 */
1037 protected createAndSetupDynamicWorkerNode (): number {
1038 const workerNodeKey = this.createAndSetupWorkerNode()
1039 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1040 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1041 message.workerId
1042 )
1043 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1044 // Kill message received from worker
1045 if (
1046 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1047 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1048 ((this.opts.enableTasksQueue === false &&
1049 workerUsage.tasks.executing === 0) ||
1050 (this.opts.enableTasksQueue === true &&
1051 workerUsage.tasks.executing === 0 &&
1052 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1053 ) {
1054 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1055 this.emitter?.emit(PoolEvents.error, error)
1056 })
1057 }
1058 })
1059 const workerInfo = this.getWorkerInfo(workerNodeKey)
1060 this.sendToWorker(workerNodeKey, {
1061 checkActive: true,
1062 workerId: workerInfo.id as number
1063 })
1064 workerInfo.dynamic = true
1065 if (
1066 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1067 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1068 ) {
1069 workerInfo.ready = true
1070 }
1071 this.checkAndEmitDynamicWorkerCreationEvents()
1072 return workerNodeKey
1073 }
1074
1075 /**
1076 * Registers a listener callback on the worker given its worker node key.
1077 *
1078 * @param workerNodeKey - The worker node key.
1079 * @param listener - The message listener callback.
1080 */
1081 protected abstract registerWorkerMessageListener<
1082 Message extends Data | Response
1083 >(
1084 workerNodeKey: number,
1085 listener: (message: MessageValue<Message>) => void
1086 ): void
1087
1088 /**
1089 * Method hooked up after a worker node has been newly created.
1090 * Can be overridden.
1091 *
1092 * @param workerNodeKey - The newly created worker node key.
1093 */
1094 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1095 // Listen to worker messages.
1096 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1097 // Send the startup message to worker.
1098 this.sendStartupMessageToWorker(workerNodeKey)
1099 // Send the statistics message to worker.
1100 this.sendStatisticsMessageToWorker(workerNodeKey)
1101 }
1102
1103 /**
1104 * Sends the startup message to worker given its worker node key.
1105 *
1106 * @param workerNodeKey - The worker node key.
1107 */
1108 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1109
1110 /**
1111 * Sends the statistics message to worker given its worker node key.
1112 *
1113 * @param workerNodeKey - The worker node key.
1114 */
1115 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1116 this.sendToWorker(workerNodeKey, {
1117 statistics: {
1118 runTime:
1119 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1120 .runTime.aggregate,
1121 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1122 .elu.aggregate
1123 },
1124 workerId: this.getWorkerInfo(workerNodeKey).id as number
1125 })
1126 }
1127
1128 private redistributeQueuedTasks (workerNodeKey: number): void {
1129 while (this.tasksQueueSize(workerNodeKey) > 0) {
1130 let targetWorkerNodeKey: number = workerNodeKey
1131 let minQueuedTasks = Infinity
1132 let executeTask = false
1133 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1134 const workerInfo = this.getWorkerInfo(workerNodeId)
1135 if (
1136 workerNodeId !== workerNodeKey &&
1137 workerInfo.ready &&
1138 workerNode.usage.tasks.queued === 0
1139 ) {
1140 if (
1141 this.workerNodes[workerNodeId].usage.tasks.executing <
1142 (this.opts.tasksQueueOptions?.concurrency as number)
1143 ) {
1144 executeTask = true
1145 }
1146 targetWorkerNodeKey = workerNodeId
1147 break
1148 }
1149 if (
1150 workerNodeId !== workerNodeKey &&
1151 workerInfo.ready &&
1152 workerNode.usage.tasks.queued < minQueuedTasks
1153 ) {
1154 minQueuedTasks = workerNode.usage.tasks.queued
1155 targetWorkerNodeKey = workerNodeId
1156 }
1157 }
1158 if (executeTask) {
1159 this.executeTask(
1160 targetWorkerNodeKey,
1161 this.dequeueTask(workerNodeKey) as Task<Data>
1162 )
1163 } else {
1164 this.enqueueTask(
1165 targetWorkerNodeKey,
1166 this.dequeueTask(workerNodeKey) as Task<Data>
1167 )
1168 }
1169 }
1170 }
1171
1172 /**
1173 * This method is the listener registered for each worker message.
1174 *
1175 * @returns The listener function to execute when a message is received from a worker.
1176 */
1177 protected workerListener (): (message: MessageValue<Response>) => void {
1178 return (message) => {
1179 this.checkMessageWorkerId(message)
1180 if (message.ready != null && message.taskFunctions != null) {
1181 // Worker ready response received from worker
1182 this.handleWorkerReadyResponse(message)
1183 } else if (message.taskId != null) {
1184 // Task execution response received from worker
1185 this.handleTaskExecutionResponse(message)
1186 } else if (message.taskFunctions != null) {
1187 // Task functions message received from worker
1188 this.getWorkerInfo(
1189 this.getWorkerNodeKeyByWorkerId(message.workerId)
1190 ).taskFunctions = message.taskFunctions
1191 }
1192 }
1193 }
1194
1195 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1196 if (message.ready === false) {
1197 throw new Error(`Worker ${message.workerId} failed to initialize`)
1198 }
1199 const workerInfo = this.getWorkerInfo(
1200 this.getWorkerNodeKeyByWorkerId(message.workerId)
1201 )
1202 workerInfo.ready = message.ready as boolean
1203 workerInfo.taskFunctions = message.taskFunctions
1204 if (this.emitter != null && this.ready) {
1205 this.emitter.emit(PoolEvents.ready, this.info)
1206 }
1207 }
1208
1209 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1210 const { taskId, taskError, data } = message
1211 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1212 if (promiseResponse != null) {
1213 if (taskError != null) {
1214 this.emitter?.emit(PoolEvents.taskError, taskError)
1215 promiseResponse.reject(taskError.message)
1216 } else {
1217 promiseResponse.resolve(data as Response)
1218 }
1219 const workerNodeKey = promiseResponse.workerNodeKey
1220 this.afterTaskExecutionHook(workerNodeKey, message)
1221 this.promiseResponseMap.delete(taskId as string)
1222 if (
1223 this.opts.enableTasksQueue === true &&
1224 this.tasksQueueSize(workerNodeKey) > 0 &&
1225 this.workerNodes[workerNodeKey].usage.tasks.executing <
1226 (this.opts.tasksQueueOptions?.concurrency as number)
1227 ) {
1228 this.executeTask(
1229 workerNodeKey,
1230 this.dequeueTask(workerNodeKey) as Task<Data>
1231 )
1232 }
1233 this.workerChoiceStrategyContext.update(workerNodeKey)
1234 }
1235 }
1236
1237 private checkAndEmitTaskExecutionEvents (): void {
1238 if (this.busy) {
1239 this.emitter?.emit(PoolEvents.busy, this.info)
1240 }
1241 }
1242
1243 private checkAndEmitTaskQueuingEvents (): void {
1244 if (this.hasBackPressure()) {
1245 this.emitter?.emit(PoolEvents.backPressure, this.info)
1246 }
1247 }
1248
1249 private checkAndEmitDynamicWorkerCreationEvents (): void {
1250 if (this.type === PoolTypes.dynamic) {
1251 if (this.full) {
1252 this.emitter?.emit(PoolEvents.full, this.info)
1253 }
1254 }
1255 }
1256
1257 /**
1258 * Gets the worker information given its worker node key.
1259 *
1260 * @param workerNodeKey - The worker node key.
1261 * @returns The worker information.
1262 */
1263 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1264 return this.workerNodes[workerNodeKey].info
1265 }
1266
1267 /**
1268 * Adds the given worker in the pool worker nodes.
1269 *
1270 * @param worker - The worker.
1271 * @returns The added worker node key.
1272 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1273 */
1274 private addWorkerNode (worker: Worker): number {
1275 const workerNode = new WorkerNode<Worker, Data>(
1276 worker,
1277 this.worker,
1278 this.maxSize
1279 )
1280 // Flag the worker node as ready at pool startup.
1281 if (this.starting) {
1282 workerNode.info.ready = true
1283 }
1284 this.workerNodes.push(workerNode)
1285 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1286 if (workerNodeKey === -1) {
1287 throw new Error('Worker node added not found')
1288 }
1289 return workerNodeKey
1290 }
1291
1292 /**
1293 * Removes the given worker from the pool worker nodes.
1294 *
1295 * @param worker - The worker.
1296 */
1297 private removeWorkerNode (worker: Worker): void {
1298 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1299 if (workerNodeKey !== -1) {
1300 this.workerNodes.splice(workerNodeKey, 1)
1301 this.workerChoiceStrategyContext.remove(workerNodeKey)
1302 }
1303 }
1304
1305 /** @inheritDoc */
1306 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1307 return (
1308 this.opts.enableTasksQueue === true &&
1309 this.workerNodes[workerNodeKey].hasBackPressure()
1310 )
1311 }
1312
1313 private hasBackPressure (): boolean {
1314 return (
1315 this.opts.enableTasksQueue === true &&
1316 this.workerNodes.findIndex(
1317 (workerNode) => !workerNode.hasBackPressure()
1318 ) === -1
1319 )
1320 }
1321
1322 /**
1323 * Executes the given task on the worker given its worker node key.
1324 *
1325 * @param workerNodeKey - The worker node key.
1326 * @param task - The task to execute.
1327 */
1328 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1329 this.beforeTaskExecutionHook(workerNodeKey, task)
1330 this.sendToWorker(workerNodeKey, task, task.transferList)
1331 this.checkAndEmitTaskExecutionEvents()
1332 }
1333
1334 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1335 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1336 this.checkAndEmitTaskQueuingEvents()
1337 return tasksQueueSize
1338 }
1339
1340 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1341 return this.workerNodes[workerNodeKey].dequeueTask()
1342 }
1343
1344 private tasksQueueSize (workerNodeKey: number): number {
1345 return this.workerNodes[workerNodeKey].tasksQueueSize()
1346 }
1347
1348 protected flushTasksQueue (workerNodeKey: number): void {
1349 while (this.tasksQueueSize(workerNodeKey) > 0) {
1350 this.executeTask(
1351 workerNodeKey,
1352 this.dequeueTask(workerNodeKey) as Task<Data>
1353 )
1354 }
1355 this.workerNodes[workerNodeKey].clearTasksQueue()
1356 }
1357
1358 private flushTasksQueues (): void {
1359 for (const [workerNodeKey] of this.workerNodes.entries()) {
1360 this.flushTasksQueue(workerNodeKey)
1361 }
1362 }
1363 }