refactor: cleanup error type
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * The start timestamp of the pool.
98 */
99 private readonly startTimestamp
100
101 /**
102 * Constructs a new poolifier pool.
103 *
104 * @param numberOfWorkers - Number of workers that this pool should manage.
105 * @param filePath - Path to the worker file.
106 * @param opts - Options for the pool.
107 */
108 public constructor (
109 protected readonly numberOfWorkers: number,
110 protected readonly filePath: string,
111 protected readonly opts: PoolOptions<Worker>
112 ) {
113 if (!this.isMain()) {
114 throw new Error(
115 'Cannot start a pool from a worker with the same type as the pool'
116 )
117 }
118 this.checkNumberOfWorkers(this.numberOfWorkers)
119 this.checkFilePath(this.filePath)
120 this.checkPoolOptions(this.opts)
121
122 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
123 this.executeTask = this.executeTask.bind(this)
124 this.enqueueTask = this.enqueueTask.bind(this)
125
126 if (this.opts.enableEvents === true) {
127 this.emitter = new PoolEmitter()
128 }
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
138
139 this.setupHook()
140
141 this.starting = true
142 this.startPool()
143 this.starting = false
144
145 this.startTimestamp = performance.now()
146 }
147
148 private checkFilePath (filePath: string): void {
149 if (
150 filePath == null ||
151 typeof filePath !== 'string' ||
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
154 throw new Error('Please specify a file with a worker implementation')
155 }
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
159 }
160
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
167 throw new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 } else if (numberOfWorkers < 0) {
171 throw new RangeError(
172 'Cannot instantiate a pool with a negative number of workers'
173 )
174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
180 if (this.type === PoolTypes.dynamic) {
181 if (max == null) {
182 throw new TypeError(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 } else if (max === 0) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
202 }
203 }
204
205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
267 if (
268 workerChoiceStrategyOptions.weights != null &&
269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
285 }
286
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
305 throw new RangeError(
306 `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
307 )
308 }
309 }
310
311 private startPool (): void {
312 while (
313 this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
316 0
317 ) < this.numberOfWorkers
318 ) {
319 this.createAndSetupWorkerNode()
320 }
321 }
322
323 /** @inheritDoc */
324 public get info (): PoolInfo {
325 return {
326 version,
327 type: this.type,
328 worker: this.worker,
329 ready: this.ready,
330 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
331 minSize: this.minSize,
332 maxSize: this.maxSize,
333 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 .runTime.aggregate &&
335 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .waitTime.aggregate && { utilization: round(this.utilization) }),
337 workerNodes: this.workerNodes.length,
338 idleWorkerNodes: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 workerNode.usage.tasks.executing === 0
341 ? accumulator + 1
342 : accumulator,
343 0
344 ),
345 busyWorkerNodes: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
348 0
349 ),
350 executedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 accumulator + workerNode.usage.tasks.executed,
353 0
354 ),
355 executingTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
357 accumulator + workerNode.usage.tasks.executing,
358 0
359 ),
360 ...(this.opts.enableTasksQueue === true && {
361 queuedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.queued,
364 0
365 )
366 }),
367 ...(this.opts.enableTasksQueue === true && {
368 maxQueuedTasks: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
371 0
372 )
373 }),
374 ...(this.opts.enableTasksQueue === true && {
375 backPressure: this.hasBackPressure()
376 }),
377 failedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.failed,
380 0
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.aggregate && {
384 runTime: {
385 minimum: round(
386 Math.min(
387 ...this.workerNodes.map(
388 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
389 )
390 )
391 ),
392 maximum: round(
393 Math.max(
394 ...this.workerNodes.map(
395 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
396 )
397 )
398 ),
399 average: round(
400 this.workerNodes.reduce(
401 (accumulator, workerNode) =>
402 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
403 0
404 ) /
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.tasks?.executed ?? 0),
408 0
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .runTime.median && {
413 median: round(
414 median(
415 this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.median ?? 0
417 )
418 )
419 )
420 })
421 }
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.aggregate && {
425 waitTime: {
426 minimum: round(
427 Math.min(
428 ...this.workerNodes.map(
429 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
430 )
431 )
432 ),
433 maximum: round(
434 Math.max(
435 ...this.workerNodes.map(
436 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
437 )
438 )
439 ),
440 average: round(
441 this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 ) /
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 accumulator + (workerNode.usage.tasks?.executed ?? 0),
449 0
450 )
451 ),
452 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
453 .waitTime.median && {
454 median: round(
455 median(
456 this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.median ?? 0
458 )
459 )
460 )
461 })
462 }
463 })
464 }
465 }
466
467 /**
468 * The pool readiness boolean status.
469 */
470 private get ready (): boolean {
471 return (
472 this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 !workerNode.info.dynamic && workerNode.info.ready
475 ? accumulator + 1
476 : accumulator,
477 0
478 ) >= this.minSize
479 )
480 }
481
482 /**
483 * The approximate pool utilization.
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
488 const poolTimeCapacity =
489 (performance.now() - this.startTimestamp) * this.maxSize
490 const totalTasksRunTime = this.workerNodes.reduce(
491 (accumulator, workerNode) =>
492 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
493 0
494 )
495 const totalTasksWaitTime = this.workerNodes.reduce(
496 (accumulator, workerNode) =>
497 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
498 0
499 )
500 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
501 }
502
503 /**
504 * The pool type.
505 *
506 * If it is `'dynamic'`, it provides the `max` property.
507 */
508 protected abstract get type (): PoolType
509
510 /**
511 * The worker type.
512 */
513 protected abstract get worker (): WorkerType
514
515 /**
516 * The pool minimum size.
517 */
518 protected get minSize (): number {
519 return this.numberOfWorkers
520 }
521
522 /**
523 * The pool maximum size.
524 */
525 protected get maxSize (): number {
526 return this.max ?? this.numberOfWorkers
527 }
528
529 /**
530 * Checks if the worker id sent in the received message from a worker is valid.
531 *
532 * @param message - The received message.
533 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
534 */
535 private checkMessageWorkerId (message: MessageValue<Response>): void {
536 if (message.workerId == null) {
537 throw new Error('Worker message received without worker id')
538 } else if (
539 message.workerId != null &&
540 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
541 ) {
542 throw new Error(
543 `Worker message received from unknown worker '${message.workerId}'`
544 )
545 }
546 }
547
548 /**
549 * Gets the given worker its worker node key.
550 *
551 * @param worker - The worker.
552 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
553 */
554 private getWorkerNodeKeyByWorker (worker: Worker): number {
555 return this.workerNodes.findIndex(
556 (workerNode) => workerNode.worker === worker
557 )
558 }
559
560 /**
561 * Gets the worker node key given its worker id.
562 *
563 * @param workerId - The worker id.
564 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
565 */
566 private getWorkerNodeKeyByWorkerId (workerId: number): number {
567 return this.workerNodes.findIndex(
568 (workerNode) => workerNode.info.id === workerId
569 )
570 }
571
572 /** @inheritDoc */
573 public setWorkerChoiceStrategy (
574 workerChoiceStrategy: WorkerChoiceStrategy,
575 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
576 ): void {
577 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
578 this.opts.workerChoiceStrategy = workerChoiceStrategy
579 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
580 this.opts.workerChoiceStrategy
581 )
582 if (workerChoiceStrategyOptions != null) {
583 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
584 }
585 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
586 workerNode.resetUsage()
587 this.sendStatisticsMessageToWorker(workerNodeKey)
588 }
589 }
590
591 /** @inheritDoc */
592 public setWorkerChoiceStrategyOptions (
593 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
594 ): void {
595 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
596 this.opts.workerChoiceStrategyOptions = {
597 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
598 ...workerChoiceStrategyOptions
599 }
600 this.workerChoiceStrategyContext.setOptions(
601 this.opts.workerChoiceStrategyOptions
602 )
603 }
604
605 /** @inheritDoc */
606 public enableTasksQueue (
607 enable: boolean,
608 tasksQueueOptions?: TasksQueueOptions
609 ): void {
610 if (this.opts.enableTasksQueue === true && !enable) {
611 this.flushTasksQueues()
612 }
613 this.opts.enableTasksQueue = enable
614 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
615 }
616
617 /** @inheritDoc */
618 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
619 if (this.opts.enableTasksQueue === true) {
620 this.checkValidTasksQueueOptions(tasksQueueOptions)
621 this.opts.tasksQueueOptions =
622 this.buildTasksQueueOptions(tasksQueueOptions)
623 } else if (this.opts.tasksQueueOptions != null) {
624 delete this.opts.tasksQueueOptions
625 }
626 }
627
628 private buildTasksQueueOptions (
629 tasksQueueOptions: TasksQueueOptions
630 ): TasksQueueOptions {
631 return {
632 concurrency: tasksQueueOptions?.concurrency ?? 1
633 }
634 }
635
636 /**
637 * Whether the pool is full or not.
638 *
639 * The pool filling boolean status.
640 */
641 protected get full (): boolean {
642 return this.workerNodes.length >= this.maxSize
643 }
644
645 /**
646 * Whether the pool is busy or not.
647 *
648 * The pool busyness boolean status.
649 */
650 protected abstract get busy (): boolean
651
652 /**
653 * Whether worker nodes are executing concurrently their tasks quota or not.
654 *
655 * @returns Worker nodes busyness boolean status.
656 */
657 protected internalBusy (): boolean {
658 if (this.opts.enableTasksQueue === true) {
659 return (
660 this.workerNodes.findIndex(
661 (workerNode) =>
662 workerNode.info.ready &&
663 workerNode.usage.tasks.executing <
664 (this.opts.tasksQueueOptions?.concurrency as number)
665 ) === -1
666 )
667 } else {
668 return (
669 this.workerNodes.findIndex(
670 (workerNode) =>
671 workerNode.info.ready && workerNode.usage.tasks.executing === 0
672 ) === -1
673 )
674 }
675 }
676
677 /** @inheritDoc */
678 public listTaskFunctions (): string[] {
679 for (const workerNode of this.workerNodes) {
680 if (
681 Array.isArray(workerNode.info.taskFunctions) &&
682 workerNode.info.taskFunctions.length > 0
683 ) {
684 return workerNode.info.taskFunctions
685 }
686 }
687 return []
688 }
689
690 /** @inheritDoc */
691 public async execute (
692 data?: Data,
693 name?: string,
694 transferList?: TransferListItem[]
695 ): Promise<Response> {
696 return await new Promise<Response>((resolve, reject) => {
697 if (name != null && typeof name !== 'string') {
698 reject(new TypeError('name argument must be a string'))
699 }
700 if (
701 name != null &&
702 typeof name === 'string' &&
703 name.trim().length === 0
704 ) {
705 reject(new TypeError('name argument must not be an empty string'))
706 }
707 if (transferList != null && !Array.isArray(transferList)) {
708 reject(new TypeError('transferList argument must be an array'))
709 }
710 const timestamp = performance.now()
711 const workerNodeKey = this.chooseWorkerNode()
712 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
713 if (
714 name != null &&
715 Array.isArray(workerInfo.taskFunctions) &&
716 !workerInfo.taskFunctions.includes(name)
717 ) {
718 reject(
719 new Error(`Task function '${name}' is not registered in the pool`)
720 )
721 }
722 const task: Task<Data> = {
723 name: name ?? DEFAULT_TASK_NAME,
724 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
725 data: data ?? ({} as Data),
726 transferList,
727 timestamp,
728 workerId: workerInfo.id as number,
729 taskId: randomUUID()
730 }
731 this.promiseResponseMap.set(task.taskId as string, {
732 resolve,
733 reject,
734 workerNodeKey
735 })
736 if (
737 this.opts.enableTasksQueue === false ||
738 (this.opts.enableTasksQueue === true &&
739 this.workerNodes[workerNodeKey].usage.tasks.executing <
740 (this.opts.tasksQueueOptions?.concurrency as number))
741 ) {
742 this.executeTask(workerNodeKey, task)
743 } else {
744 this.enqueueTask(workerNodeKey, task)
745 }
746 })
747 }
748
749 /** @inheritDoc */
750 public async destroy (): Promise<void> {
751 await Promise.all(
752 this.workerNodes.map(async (_, workerNodeKey) => {
753 await this.destroyWorkerNode(workerNodeKey)
754 })
755 )
756 this.emitter?.emit(PoolEvents.destroy, this.info)
757 }
758
759 protected async sendKillMessageToWorker (
760 workerNodeKey: number,
761 workerId: number
762 ): Promise<void> {
763 await new Promise<void>((resolve, reject) => {
764 this.registerWorkerMessageListener(workerNodeKey, (message) => {
765 if (message.kill === 'success') {
766 resolve()
767 } else if (message.kill === 'failure') {
768 reject(new Error(`Worker ${workerId} kill message handling failed`))
769 }
770 })
771 this.sendToWorker(workerNodeKey, { kill: true, workerId })
772 })
773 }
774
775 /**
776 * Terminates the worker node given its worker node key.
777 *
778 * @param workerNodeKey - The worker node key.
779 */
780 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
781
782 /**
783 * Setup hook to execute code before worker nodes are created in the abstract constructor.
784 * Can be overridden.
785 *
786 * @virtual
787 */
788 protected setupHook (): void {
789 // Intentionally empty
790 }
791
792 /**
793 * Should return whether the worker is the main worker or not.
794 */
795 protected abstract isMain (): boolean
796
797 /**
798 * Hook executed before the worker task execution.
799 * Can be overridden.
800 *
801 * @param workerNodeKey - The worker node key.
802 * @param task - The task to execute.
803 */
804 protected beforeTaskExecutionHook (
805 workerNodeKey: number,
806 task: Task<Data>
807 ): void {
808 if (this.workerNodes[workerNodeKey]?.usage != null) {
809 const workerUsage = this.workerNodes[workerNodeKey].usage
810 ++workerUsage.tasks.executing
811 this.updateWaitTimeWorkerUsage(workerUsage, task)
812 }
813 if (
814 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
815 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
816 task.name as string
817 ) != null
818 ) {
819 const taskFunctionWorkerUsage = this.workerNodes[
820 workerNodeKey
821 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
822 ++taskFunctionWorkerUsage.tasks.executing
823 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
824 }
825 }
826
827 /**
828 * Hook executed after the worker task execution.
829 * Can be overridden.
830 *
831 * @param workerNodeKey - The worker node key.
832 * @param message - The received message.
833 */
834 protected afterTaskExecutionHook (
835 workerNodeKey: number,
836 message: MessageValue<Response>
837 ): void {
838 if (this.workerNodes[workerNodeKey]?.usage != null) {
839 const workerUsage = this.workerNodes[workerNodeKey].usage
840 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
841 this.updateRunTimeWorkerUsage(workerUsage, message)
842 this.updateEluWorkerUsage(workerUsage, message)
843 }
844 if (
845 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
846 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
847 message.taskPerformance?.name as string
848 ) != null
849 ) {
850 const taskFunctionWorkerUsage = this.workerNodes[
851 workerNodeKey
852 ].getTaskFunctionWorkerUsage(
853 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
854 ) as WorkerUsage
855 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
856 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
857 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
858 }
859 }
860
861 /**
862 * Whether the worker node shall update its task function worker usage or not.
863 *
864 * @param workerNodeKey - The worker node key.
865 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
866 */
867 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
868 const workerInfo = this.getWorkerInfo(workerNodeKey)
869 return (
870 workerInfo != null &&
871 Array.isArray(workerInfo.taskFunctions) &&
872 workerInfo.taskFunctions.length > 2
873 )
874 }
875
876 private updateTaskStatisticsWorkerUsage (
877 workerUsage: WorkerUsage,
878 message: MessageValue<Response>
879 ): void {
880 const workerTaskStatistics = workerUsage.tasks
881 if (
882 workerTaskStatistics.executing != null &&
883 workerTaskStatistics.executing > 0
884 ) {
885 --workerTaskStatistics.executing
886 } else if (
887 workerTaskStatistics.executing != null &&
888 workerTaskStatistics.executing < 0
889 ) {
890 throw new Error(
891 'Worker usage statistic for tasks executing cannot be negative'
892 )
893 }
894 if (message.taskError == null) {
895 ++workerTaskStatistics.executed
896 } else {
897 ++workerTaskStatistics.failed
898 }
899 }
900
901 private updateRunTimeWorkerUsage (
902 workerUsage: WorkerUsage,
903 message: MessageValue<Response>
904 ): void {
905 updateMeasurementStatistics(
906 workerUsage.runTime,
907 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
908 message.taskPerformance?.runTime ?? 0,
909 workerUsage.tasks.executed
910 )
911 }
912
913 private updateWaitTimeWorkerUsage (
914 workerUsage: WorkerUsage,
915 task: Task<Data>
916 ): void {
917 const timestamp = performance.now()
918 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
919 updateMeasurementStatistics(
920 workerUsage.waitTime,
921 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
922 taskWaitTime,
923 workerUsage.tasks.executed
924 )
925 }
926
927 private updateEluWorkerUsage (
928 workerUsage: WorkerUsage,
929 message: MessageValue<Response>
930 ): void {
931 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
932 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
933 updateMeasurementStatistics(
934 workerUsage.elu.active,
935 eluTaskStatisticsRequirements,
936 message.taskPerformance?.elu?.active ?? 0,
937 workerUsage.tasks.executed
938 )
939 updateMeasurementStatistics(
940 workerUsage.elu.idle,
941 eluTaskStatisticsRequirements,
942 message.taskPerformance?.elu?.idle ?? 0,
943 workerUsage.tasks.executed
944 )
945 if (eluTaskStatisticsRequirements.aggregate) {
946 if (message.taskPerformance?.elu != null) {
947 if (workerUsage.elu.utilization != null) {
948 workerUsage.elu.utilization =
949 (workerUsage.elu.utilization +
950 message.taskPerformance.elu.utilization) /
951 2
952 } else {
953 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
954 }
955 }
956 }
957 }
958
959 /**
960 * Chooses a worker node for the next task.
961 *
962 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
963 *
964 * @returns The chosen worker node key
965 */
966 private chooseWorkerNode (): number {
967 if (this.shallCreateDynamicWorker()) {
968 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
969 if (
970 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
971 ) {
972 return workerNodeKey
973 }
974 }
975 return this.workerChoiceStrategyContext.execute()
976 }
977
978 /**
979 * Conditions for dynamic worker creation.
980 *
981 * @returns Whether to create a dynamic worker or not.
982 */
983 private shallCreateDynamicWorker (): boolean {
984 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
985 }
986
987 /**
988 * Sends a message to worker given its worker node key.
989 *
990 * @param workerNodeKey - The worker node key.
991 * @param message - The message.
992 * @param transferList - The optional array of transferable objects.
993 */
994 protected abstract sendToWorker (
995 workerNodeKey: number,
996 message: MessageValue<Data>,
997 transferList?: TransferListItem[]
998 ): void
999
1000 /**
1001 * Creates a new worker.
1002 *
1003 * @returns Newly created worker.
1004 */
1005 protected abstract createWorker (): Worker
1006
1007 /**
1008 * Creates a new, completely set up worker node.
1009 *
1010 * @returns New, completely set up worker node key.
1011 */
1012 protected createAndSetupWorkerNode (): number {
1013 const worker = this.createWorker()
1014
1015 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1016 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1017 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1018 worker.on('error', (error) => {
1019 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1020 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1021 workerInfo.ready = false
1022 this.workerNodes[workerNodeKey].closeChannel()
1023 this.emitter?.emit(PoolEvents.error, error)
1024 if (this.opts.restartWorkerOnError === true && !this.starting) {
1025 if (workerInfo.dynamic) {
1026 this.createAndSetupDynamicWorkerNode()
1027 } else {
1028 this.createAndSetupWorkerNode()
1029 }
1030 }
1031 if (this.opts.enableTasksQueue === true) {
1032 this.redistributeQueuedTasks(workerNodeKey)
1033 }
1034 })
1035 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1036 worker.once('exit', () => {
1037 this.removeWorkerNode(worker)
1038 })
1039
1040 const workerNodeKey = this.addWorkerNode(worker)
1041
1042 this.afterWorkerNodeSetup(workerNodeKey)
1043
1044 return workerNodeKey
1045 }
1046
1047 /**
1048 * Creates a new, completely set up dynamic worker node.
1049 *
1050 * @returns New, completely set up dynamic worker node key.
1051 */
1052 protected createAndSetupDynamicWorkerNode (): number {
1053 const workerNodeKey = this.createAndSetupWorkerNode()
1054 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1055 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1056 message.workerId
1057 )
1058 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1059 // Kill message received from worker
1060 if (
1061 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1062 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1063 ((this.opts.enableTasksQueue === false &&
1064 workerUsage.tasks.executing === 0) ||
1065 (this.opts.enableTasksQueue === true &&
1066 workerUsage.tasks.executing === 0 &&
1067 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1068 ) {
1069 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1070 this.emitter?.emit(PoolEvents.error, error)
1071 })
1072 }
1073 })
1074 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1075 this.sendToWorker(workerNodeKey, {
1076 checkActive: true,
1077 workerId: workerInfo.id as number
1078 })
1079 workerInfo.dynamic = true
1080 if (
1081 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1082 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1083 ) {
1084 workerInfo.ready = true
1085 }
1086 this.checkAndEmitDynamicWorkerCreationEvents()
1087 return workerNodeKey
1088 }
1089
1090 /**
1091 * Registers a listener callback on the worker given its worker node key.
1092 *
1093 * @param workerNodeKey - The worker node key.
1094 * @param listener - The message listener callback.
1095 */
1096 protected abstract registerWorkerMessageListener<
1097 Message extends Data | Response
1098 >(
1099 workerNodeKey: number,
1100 listener: (message: MessageValue<Message>) => void
1101 ): void
1102
1103 /**
1104 * Method hooked up after a worker node has been newly created.
1105 * Can be overridden.
1106 *
1107 * @param workerNodeKey - The newly created worker node key.
1108 */
1109 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1110 // Listen to worker messages.
1111 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1112 // Send the startup message to worker.
1113 this.sendStartupMessageToWorker(workerNodeKey)
1114 // Send the statistics message to worker.
1115 this.sendStatisticsMessageToWorker(workerNodeKey)
1116 }
1117
1118 /**
1119 * Sends the startup message to worker given its worker node key.
1120 *
1121 * @param workerNodeKey - The worker node key.
1122 */
1123 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1124
1125 /**
1126 * Sends the statistics message to worker given its worker node key.
1127 *
1128 * @param workerNodeKey - The worker node key.
1129 */
1130 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1131 this.sendToWorker(workerNodeKey, {
1132 statistics: {
1133 runTime:
1134 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1135 .runTime.aggregate,
1136 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1137 .elu.aggregate
1138 },
1139 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1140 })
1141 }
1142
1143 private redistributeQueuedTasks (workerNodeKey: number): void {
1144 while (this.tasksQueueSize(workerNodeKey) > 0) {
1145 let targetWorkerNodeKey: number = workerNodeKey
1146 let minQueuedTasks = Infinity
1147 let executeTask = false
1148 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1149 const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
1150 if (
1151 workerNodeId !== workerNodeKey &&
1152 workerInfo.ready &&
1153 workerNode.usage.tasks.queued === 0
1154 ) {
1155 if (
1156 this.workerNodes[workerNodeId].usage.tasks.executing <
1157 (this.opts.tasksQueueOptions?.concurrency as number)
1158 ) {
1159 executeTask = true
1160 }
1161 targetWorkerNodeKey = workerNodeId
1162 break
1163 }
1164 if (
1165 workerNodeId !== workerNodeKey &&
1166 workerInfo.ready &&
1167 workerNode.usage.tasks.queued < minQueuedTasks
1168 ) {
1169 minQueuedTasks = workerNode.usage.tasks.queued
1170 targetWorkerNodeKey = workerNodeId
1171 }
1172 }
1173 if (executeTask) {
1174 this.executeTask(
1175 targetWorkerNodeKey,
1176 this.dequeueTask(workerNodeKey) as Task<Data>
1177 )
1178 } else {
1179 this.enqueueTask(
1180 targetWorkerNodeKey,
1181 this.dequeueTask(workerNodeKey) as Task<Data>
1182 )
1183 }
1184 }
1185 }
1186
1187 /**
1188 * This method is the listener registered for each worker message.
1189 *
1190 * @returns The listener function to execute when a message is received from a worker.
1191 */
1192 protected workerListener (): (message: MessageValue<Response>) => void {
1193 return (message) => {
1194 this.checkMessageWorkerId(message)
1195 if (message.ready != null && message.taskFunctions != null) {
1196 // Worker ready response received from worker
1197 this.handleWorkerReadyResponse(message)
1198 } else if (message.taskId != null) {
1199 // Task execution response received from worker
1200 this.handleTaskExecutionResponse(message)
1201 } else if (message.taskFunctions != null) {
1202 // Task functions message received from worker
1203 (
1204 this.getWorkerInfo(
1205 this.getWorkerNodeKeyByWorkerId(message.workerId)
1206 ) as WorkerInfo
1207 ).taskFunctions = message.taskFunctions
1208 }
1209 }
1210 }
1211
1212 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1213 if (message.ready === false) {
1214 throw new Error(`Worker ${message.workerId} failed to initialize`)
1215 }
1216 const workerInfo = this.getWorkerInfo(
1217 this.getWorkerNodeKeyByWorkerId(message.workerId)
1218 ) as WorkerInfo
1219 workerInfo.ready = message.ready as boolean
1220 workerInfo.taskFunctions = message.taskFunctions
1221 if (this.emitter != null && this.ready) {
1222 this.emitter.emit(PoolEvents.ready, this.info)
1223 }
1224 }
1225
1226 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1227 const { taskId, taskError, data } = message
1228 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1229 if (promiseResponse != null) {
1230 if (taskError != null) {
1231 this.emitter?.emit(PoolEvents.taskError, taskError)
1232 promiseResponse.reject(taskError.message)
1233 } else {
1234 promiseResponse.resolve(data as Response)
1235 }
1236 const workerNodeKey = promiseResponse.workerNodeKey
1237 this.afterTaskExecutionHook(workerNodeKey, message)
1238 this.promiseResponseMap.delete(taskId as string)
1239 if (
1240 this.opts.enableTasksQueue === true &&
1241 this.tasksQueueSize(workerNodeKey) > 0 &&
1242 this.workerNodes[workerNodeKey].usage.tasks.executing <
1243 (this.opts.tasksQueueOptions?.concurrency as number)
1244 ) {
1245 this.executeTask(
1246 workerNodeKey,
1247 this.dequeueTask(workerNodeKey) as Task<Data>
1248 )
1249 }
1250 this.workerChoiceStrategyContext.update(workerNodeKey)
1251 }
1252 }
1253
1254 private checkAndEmitTaskExecutionEvents (): void {
1255 if (this.busy) {
1256 this.emitter?.emit(PoolEvents.busy, this.info)
1257 }
1258 }
1259
1260 private checkAndEmitTaskQueuingEvents (): void {
1261 if (this.hasBackPressure()) {
1262 this.emitter?.emit(PoolEvents.backPressure, this.info)
1263 }
1264 }
1265
1266 private checkAndEmitDynamicWorkerCreationEvents (): void {
1267 if (this.type === PoolTypes.dynamic) {
1268 if (this.full) {
1269 this.emitter?.emit(PoolEvents.full, this.info)
1270 }
1271 }
1272 }
1273
1274 /**
1275 * Gets the worker information given its worker node key.
1276 *
1277 * @param workerNodeKey - The worker node key.
1278 * @returns The worker information.
1279 */
1280 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1281 return this.workerNodes[workerNodeKey]?.info
1282 }
1283
1284 /**
1285 * Adds the given worker in the pool worker nodes.
1286 *
1287 * @param worker - The worker.
1288 * @returns The added worker node key.
1289 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1290 */
1291 private addWorkerNode (worker: Worker): number {
1292 const workerNode = new WorkerNode<Worker, Data>(
1293 worker,
1294 this.worker,
1295 this.maxSize
1296 )
1297 // Flag the worker node as ready at pool startup.
1298 if (this.starting) {
1299 workerNode.info.ready = true
1300 }
1301 this.workerNodes.push(workerNode)
1302 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1303 if (workerNodeKey === -1) {
1304 throw new Error('Worker node added not found')
1305 }
1306 return workerNodeKey
1307 }
1308
1309 /**
1310 * Removes the given worker from the pool worker nodes.
1311 *
1312 * @param worker - The worker.
1313 */
1314 private removeWorkerNode (worker: Worker): void {
1315 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1316 if (workerNodeKey !== -1) {
1317 this.workerNodes.splice(workerNodeKey, 1)
1318 this.workerChoiceStrategyContext.remove(workerNodeKey)
1319 }
1320 }
1321
1322 /** @inheritDoc */
1323 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1324 return (
1325 this.opts.enableTasksQueue === true &&
1326 this.workerNodes[workerNodeKey].hasBackPressure()
1327 )
1328 }
1329
1330 private hasBackPressure (): boolean {
1331 return (
1332 this.opts.enableTasksQueue === true &&
1333 this.workerNodes.findIndex(
1334 (workerNode) => !workerNode.hasBackPressure()
1335 ) === -1
1336 )
1337 }
1338
1339 /**
1340 * Executes the given task on the worker given its worker node key.
1341 *
1342 * @param workerNodeKey - The worker node key.
1343 * @param task - The task to execute.
1344 */
1345 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1346 this.beforeTaskExecutionHook(workerNodeKey, task)
1347 this.sendToWorker(workerNodeKey, task, task.transferList)
1348 this.checkAndEmitTaskExecutionEvents()
1349 }
1350
1351 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1352 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1353 this.checkAndEmitTaskQueuingEvents()
1354 return tasksQueueSize
1355 }
1356
1357 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1358 return this.workerNodes[workerNodeKey].dequeueTask()
1359 }
1360
1361 private tasksQueueSize (workerNodeKey: number): number {
1362 return this.workerNodes[workerNodeKey].tasksQueueSize()
1363 }
1364
1365 protected flushTasksQueue (workerNodeKey: number): void {
1366 while (this.tasksQueueSize(workerNodeKey) > 0) {
1367 this.executeTask(
1368 workerNodeKey,
1369 this.dequeueTask(workerNodeKey) as Task<Data>
1370 )
1371 }
1372 this.workerNodes[workerNodeKey].clearTasksQueue()
1373 }
1374
1375 private flushTasksQueues (): void {
1376 for (const [workerNodeKey] of this.workerNodes.entries()) {
1377 this.flushTasksQueue(workerNodeKey)
1378 }
1379 }
1380 }