feat: untangle worker choice strategies tasks distribution and dynamic worker creatio...
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import { type TransferListItem } from 'node:worker_threads'
5 import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
92 /**
93 * Whether the pool is starting or not.
94 */
95 private readonly starting: boolean
96 /**
97 * The start timestamp of the pool.
98 */
99 private readonly startTimestamp
100
101 /**
102 * Constructs a new poolifier pool.
103 *
104 * @param numberOfWorkers - Number of workers that this pool should manage.
105 * @param filePath - Path to the worker file.
106 * @param opts - Options for the pool.
107 */
108 public constructor (
109 protected readonly numberOfWorkers: number,
110 protected readonly filePath: string,
111 protected readonly opts: PoolOptions<Worker>
112 ) {
113 if (!this.isMain()) {
114 throw new Error(
115 'Cannot start a pool from a worker with the same type as the pool'
116 )
117 }
118 this.checkNumberOfWorkers(this.numberOfWorkers)
119 this.checkFilePath(this.filePath)
120 this.checkPoolOptions(this.opts)
121
122 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
123 this.executeTask = this.executeTask.bind(this)
124 this.enqueueTask = this.enqueueTask.bind(this)
125
126 if (this.opts.enableEvents === true) {
127 this.emitter = new PoolEmitter()
128 }
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
138
139 this.setupHook()
140
141 this.starting = true
142 this.startPool()
143 this.starting = false
144
145 this.startTimestamp = performance.now()
146 }
147
148 private checkFilePath (filePath: string): void {
149 if (
150 filePath == null ||
151 typeof filePath !== 'string' ||
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
154 throw new Error('Please specify a file with a worker implementation')
155 }
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
159 }
160
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
167 throw new TypeError(
168 'Cannot instantiate a pool with a non safe integer number of workers'
169 )
170 } else if (numberOfWorkers < 0) {
171 throw new RangeError(
172 'Cannot instantiate a pool with a negative number of workers'
173 )
174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
180 if (this.type === PoolTypes.dynamic) {
181 if (max == null) {
182 throw new Error(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
193 } else if (max === 0) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
202 }
203 }
204
205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
230 }
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
237 throw new Error(
238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
239 )
240 }
241 }
242
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
267 if (
268 workerChoiceStrategyOptions.weights != null &&
269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
285 }
286
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
305 throw new Error(
306 `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
307 )
308 }
309 }
310
311 private startPool (): void {
312 while (
313 this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
316 0
317 ) < this.numberOfWorkers
318 ) {
319 this.createAndSetupWorkerNode()
320 }
321 }
322
323 /** @inheritDoc */
324 public get info (): PoolInfo {
325 return {
326 version,
327 type: this.type,
328 worker: this.worker,
329 ready: this.ready,
330 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
331 minSize: this.minSize,
332 maxSize: this.maxSize,
333 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 .runTime.aggregate &&
335 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .waitTime.aggregate && { utilization: round(this.utilization) }),
337 workerNodes: this.workerNodes.length,
338 idleWorkerNodes: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 workerNode.usage.tasks.executing === 0
341 ? accumulator + 1
342 : accumulator,
343 0
344 ),
345 busyWorkerNodes: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
348 0
349 ),
350 executedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
352 accumulator + workerNode.usage.tasks.executed,
353 0
354 ),
355 executingTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
357 accumulator + workerNode.usage.tasks.executing,
358 0
359 ),
360 ...(this.opts.enableTasksQueue === true && {
361 queuedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.queued,
364 0
365 )
366 }),
367 ...(this.opts.enableTasksQueue === true && {
368 maxQueuedTasks: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
371 0
372 )
373 }),
374 ...(this.opts.enableTasksQueue === true && {
375 backPressure: this.hasBackPressure()
376 }),
377 failedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.failed,
380 0
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.aggregate && {
384 runTime: {
385 minimum: round(
386 Math.min(
387 ...this.workerNodes.map(
388 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
389 )
390 )
391 ),
392 maximum: round(
393 Math.max(
394 ...this.workerNodes.map(
395 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
396 )
397 )
398 ),
399 average: round(
400 this.workerNodes.reduce(
401 (accumulator, workerNode) =>
402 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
403 0
404 ) /
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.tasks?.executed ?? 0),
408 0
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .runTime.median && {
413 median: round(
414 median(
415 this.workerNodes.map(
416 (workerNode) => workerNode.usage.runTime?.median ?? 0
417 )
418 )
419 )
420 })
421 }
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.aggregate && {
425 waitTime: {
426 minimum: round(
427 Math.min(
428 ...this.workerNodes.map(
429 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
430 )
431 )
432 ),
433 maximum: round(
434 Math.max(
435 ...this.workerNodes.map(
436 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
437 )
438 )
439 ),
440 average: round(
441 this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 ) /
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 accumulator + (workerNode.usage.tasks?.executed ?? 0),
449 0
450 )
451 ),
452 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
453 .waitTime.median && {
454 median: round(
455 median(
456 this.workerNodes.map(
457 (workerNode) => workerNode.usage.waitTime?.median ?? 0
458 )
459 )
460 )
461 })
462 }
463 })
464 }
465 }
466
467 /**
468 * The pool readiness boolean status.
469 */
470 private get ready (): boolean {
471 return (
472 this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 !workerNode.info.dynamic && workerNode.info.ready
475 ? accumulator + 1
476 : accumulator,
477 0
478 ) >= this.minSize
479 )
480 }
481
482 /**
483 * The approximate pool utilization.
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
488 const poolTimeCapacity =
489 (performance.now() - this.startTimestamp) * this.maxSize
490 const totalTasksRunTime = this.workerNodes.reduce(
491 (accumulator, workerNode) =>
492 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
493 0
494 )
495 const totalTasksWaitTime = this.workerNodes.reduce(
496 (accumulator, workerNode) =>
497 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
498 0
499 )
500 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
501 }
502
503 /**
504 * The pool type.
505 *
506 * If it is `'dynamic'`, it provides the `max` property.
507 */
508 protected abstract get type (): PoolType
509
510 /**
511 * The worker type.
512 */
513 protected abstract get worker (): WorkerType
514
515 /**
516 * The pool minimum size.
517 */
518 protected get minSize (): number {
519 return this.numberOfWorkers
520 }
521
522 /**
523 * The pool maximum size.
524 */
525 protected get maxSize (): number {
526 return this.max ?? this.numberOfWorkers
527 }
528
529 /**
530 * Checks if the worker id sent in the received message from a worker is valid.
531 *
532 * @param message - The received message.
533 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
534 */
535 private checkMessageWorkerId (message: MessageValue<Response>): void {
536 if (message.workerId == null) {
537 throw new Error('Worker message received without worker id')
538 } else if (
539 message.workerId != null &&
540 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
541 ) {
542 throw new Error(
543 `Worker message received from unknown worker '${message.workerId}'`
544 )
545 }
546 }
547
548 /**
549 * Gets the given worker its worker node key.
550 *
551 * @param worker - The worker.
552 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
553 */
554 private getWorkerNodeKeyByWorker (worker: Worker): number {
555 return this.workerNodes.findIndex(
556 (workerNode) => workerNode.worker === worker
557 )
558 }
559
560 /**
561 * Gets the worker node key given its worker id.
562 *
563 * @param workerId - The worker id.
564 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
565 */
566 private getWorkerNodeKeyByWorkerId (workerId: number): number {
567 return this.workerNodes.findIndex(
568 (workerNode) => workerNode.info.id === workerId
569 )
570 }
571
572 /** @inheritDoc */
573 public setWorkerChoiceStrategy (
574 workerChoiceStrategy: WorkerChoiceStrategy,
575 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
576 ): void {
577 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
578 this.opts.workerChoiceStrategy = workerChoiceStrategy
579 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
580 this.opts.workerChoiceStrategy
581 )
582 if (workerChoiceStrategyOptions != null) {
583 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
584 }
585 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
586 workerNode.resetUsage()
587 this.sendStatisticsMessageToWorker(workerNodeKey)
588 }
589 }
590
591 /** @inheritDoc */
592 public setWorkerChoiceStrategyOptions (
593 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
594 ): void {
595 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
596 this.opts.workerChoiceStrategyOptions = {
597 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
598 ...workerChoiceStrategyOptions
599 }
600 this.workerChoiceStrategyContext.setOptions(
601 this.opts.workerChoiceStrategyOptions
602 )
603 }
604
605 /** @inheritDoc */
606 public enableTasksQueue (
607 enable: boolean,
608 tasksQueueOptions?: TasksQueueOptions
609 ): void {
610 if (this.opts.enableTasksQueue === true && !enable) {
611 this.flushTasksQueues()
612 }
613 this.opts.enableTasksQueue = enable
614 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
615 }
616
617 /** @inheritDoc */
618 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
619 if (this.opts.enableTasksQueue === true) {
620 this.checkValidTasksQueueOptions(tasksQueueOptions)
621 this.opts.tasksQueueOptions =
622 this.buildTasksQueueOptions(tasksQueueOptions)
623 } else if (this.opts.tasksQueueOptions != null) {
624 delete this.opts.tasksQueueOptions
625 }
626 }
627
628 private buildTasksQueueOptions (
629 tasksQueueOptions: TasksQueueOptions
630 ): TasksQueueOptions {
631 return {
632 concurrency: tasksQueueOptions?.concurrency ?? 1
633 }
634 }
635
636 /**
637 * Whether the pool is full or not.
638 *
639 * The pool filling boolean status.
640 */
641 protected get full (): boolean {
642 return this.workerNodes.length >= this.maxSize
643 }
644
645 /**
646 * Whether the pool is busy or not.
647 *
648 * The pool busyness boolean status.
649 */
650 protected abstract get busy (): boolean
651
652 /**
653 * Whether worker nodes are executing concurrently their tasks quota or not.
654 *
655 * @returns Worker nodes busyness boolean status.
656 */
657 protected internalBusy (): boolean {
658 if (this.opts.enableTasksQueue === true) {
659 return (
660 this.workerNodes.findIndex(
661 (workerNode) =>
662 workerNode.info.ready &&
663 workerNode.usage.tasks.executing <
664 (this.opts.tasksQueueOptions?.concurrency as number)
665 ) === -1
666 )
667 } else {
668 return (
669 this.workerNodes.findIndex(
670 (workerNode) =>
671 workerNode.info.ready && workerNode.usage.tasks.executing === 0
672 ) === -1
673 )
674 }
675 }
676
677 /** @inheritDoc */
678 public listTaskFunctions (): string[] {
679 for (const workerNode of this.workerNodes) {
680 if (
681 Array.isArray(workerNode.info.taskFunctions) &&
682 workerNode.info.taskFunctions.length > 0
683 ) {
684 return workerNode.info.taskFunctions
685 }
686 }
687 return []
688 }
689
690 /** @inheritDoc */
691 public async execute (
692 data?: Data,
693 name?: string,
694 transferList?: TransferListItem[]
695 ): Promise<Response> {
696 return await new Promise<Response>((resolve, reject) => {
697 if (name != null && typeof name !== 'string') {
698 reject(new TypeError('name argument must be a string'))
699 }
700 if (
701 name != null &&
702 typeof name === 'string' &&
703 name.trim().length === 0
704 ) {
705 reject(new TypeError('name argument must not be an empty string'))
706 }
707 if (transferList != null && !Array.isArray(transferList)) {
708 reject(new TypeError('transferList argument must be an array'))
709 }
710 const timestamp = performance.now()
711 const workerNodeKey = this.chooseWorkerNode()
712 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
713 if (
714 name != null &&
715 Array.isArray(workerInfo.taskFunctions) &&
716 !workerInfo.taskFunctions.includes(name)
717 ) {
718 reject(
719 new Error(`Task function '${name}' is not registered in the pool`)
720 )
721 }
722 const task: Task<Data> = {
723 name: name ?? DEFAULT_TASK_NAME,
724 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
725 data: data ?? ({} as Data),
726 transferList,
727 timestamp,
728 workerId: workerInfo.id as number,
729 taskId: randomUUID()
730 }
731 this.promiseResponseMap.set(task.taskId as string, {
732 resolve,
733 reject,
734 workerNodeKey
735 })
736 if (
737 this.opts.enableTasksQueue === false ||
738 (this.opts.enableTasksQueue === true &&
739 this.workerNodes[workerNodeKey].usage.tasks.executing <
740 (this.opts.tasksQueueOptions?.concurrency as number))
741 ) {
742 this.executeTask(workerNodeKey, task)
743 } else {
744 this.enqueueTask(workerNodeKey, task)
745 }
746 })
747 }
748
749 /** @inheritDoc */
750 public async destroy (): Promise<void> {
751 await Promise.all(
752 this.workerNodes.map(async (_, workerNodeKey) => {
753 await this.destroyWorkerNode(workerNodeKey)
754 })
755 )
756 this.emitter?.emit(PoolEvents.destroy, this.info)
757 }
758
759 protected async sendKillMessageToWorker (
760 workerNodeKey: number,
761 workerId: number
762 ): Promise<void> {
763 await new Promise<void>((resolve, reject) => {
764 this.registerWorkerMessageListener(workerNodeKey, (message) => {
765 if (message.kill === 'success') {
766 resolve()
767 } else if (message.kill === 'failure') {
768 reject(new Error(`Worker ${workerId} kill message handling failed`))
769 }
770 })
771 this.sendToWorker(workerNodeKey, { kill: true, workerId })
772 })
773 }
774
775 /**
776 * Terminates the worker node given its worker node key.
777 *
778 * @param workerNodeKey - The worker node key.
779 */
780 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
781
782 /**
783 * Setup hook to execute code before worker nodes are created in the abstract constructor.
784 * Can be overridden.
785 *
786 * @virtual
787 */
788 protected setupHook (): void {
789 // Intentionally empty
790 }
791
792 /**
793 * Should return whether the worker is the main worker or not.
794 */
795 protected abstract isMain (): boolean
796
797 /**
798 * Hook executed before the worker task execution.
799 * Can be overridden.
800 *
801 * @param workerNodeKey - The worker node key.
802 * @param task - The task to execute.
803 */
804 protected beforeTaskExecutionHook (
805 workerNodeKey: number,
806 task: Task<Data>
807 ): void {
808 if (this.workerNodes[workerNodeKey]?.usage != null) {
809 const workerUsage = this.workerNodes[workerNodeKey].usage
810 ++workerUsage.tasks.executing
811 this.updateWaitTimeWorkerUsage(workerUsage, task)
812 }
813 if (
814 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
815 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
816 task.name as string
817 ) != null
818 ) {
819 const taskFunctionWorkerUsage = this.workerNodes[
820 workerNodeKey
821 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
822 if (taskFunctionWorkerUsage != null) {
823 ++taskFunctionWorkerUsage.tasks.executing
824 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
825 }
826 }
827 }
828
829 /**
830 * Hook executed after the worker task execution.
831 * Can be overridden.
832 *
833 * @param workerNodeKey - The worker node key.
834 * @param message - The received message.
835 */
836 protected afterTaskExecutionHook (
837 workerNodeKey: number,
838 message: MessageValue<Response>
839 ): void {
840 if (this.workerNodes[workerNodeKey]?.usage != null) {
841 const workerUsage = this.workerNodes[workerNodeKey].usage
842 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
843 this.updateRunTimeWorkerUsage(workerUsage, message)
844 this.updateEluWorkerUsage(workerUsage, message)
845 }
846 if (
847 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
848 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
849 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
850 ) != null
851 ) {
852 const taskFunctionWorkerUsage = this.workerNodes[
853 workerNodeKey
854 ].getTaskFunctionWorkerUsage(
855 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
856 ) as WorkerUsage
857 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
858 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
859 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
860 }
861 }
862
863 /**
864 * Whether the worker node shall update its task function worker usage or not.
865 *
866 * @param workerNodeKey - The worker node key.
867 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
868 */
869 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
870 const workerInfo = this.getWorkerInfo(workerNodeKey)
871 return (
872 workerInfo != null &&
873 Array.isArray(workerInfo.taskFunctions) &&
874 workerInfo.taskFunctions.length > 2
875 )
876 }
877
878 private updateTaskStatisticsWorkerUsage (
879 workerUsage: WorkerUsage,
880 message: MessageValue<Response>
881 ): void {
882 const workerTaskStatistics = workerUsage.tasks
883 if (
884 workerTaskStatistics.executing != null &&
885 workerTaskStatistics.executing > 0
886 ) {
887 --workerTaskStatistics.executing
888 } else if (
889 workerTaskStatistics.executing != null &&
890 workerTaskStatistics.executing < 0
891 ) {
892 throw new Error(
893 'Worker usage statistic for tasks executing cannot be negative'
894 )
895 }
896 if (message.taskError == null) {
897 ++workerTaskStatistics.executed
898 } else {
899 ++workerTaskStatistics.failed
900 }
901 }
902
903 private updateRunTimeWorkerUsage (
904 workerUsage: WorkerUsage,
905 message: MessageValue<Response>
906 ): void {
907 updateMeasurementStatistics(
908 workerUsage.runTime,
909 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
910 message.taskPerformance?.runTime ?? 0,
911 workerUsage.tasks.executed
912 )
913 }
914
915 private updateWaitTimeWorkerUsage (
916 workerUsage: WorkerUsage,
917 task: Task<Data>
918 ): void {
919 const timestamp = performance.now()
920 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
921 updateMeasurementStatistics(
922 workerUsage.waitTime,
923 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
924 taskWaitTime,
925 workerUsage.tasks.executed
926 )
927 }
928
929 private updateEluWorkerUsage (
930 workerUsage: WorkerUsage,
931 message: MessageValue<Response>
932 ): void {
933 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
934 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
935 updateMeasurementStatistics(
936 workerUsage.elu.active,
937 eluTaskStatisticsRequirements,
938 message.taskPerformance?.elu?.active ?? 0,
939 workerUsage.tasks.executed
940 )
941 updateMeasurementStatistics(
942 workerUsage.elu.idle,
943 eluTaskStatisticsRequirements,
944 message.taskPerformance?.elu?.idle ?? 0,
945 workerUsage.tasks.executed
946 )
947 if (eluTaskStatisticsRequirements.aggregate) {
948 if (message.taskPerformance?.elu != null) {
949 if (workerUsage.elu.utilization != null) {
950 workerUsage.elu.utilization =
951 (workerUsage.elu.utilization +
952 message.taskPerformance.elu.utilization) /
953 2
954 } else {
955 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
956 }
957 }
958 }
959 }
960
961 /**
962 * Chooses a worker node for the next task.
963 *
964 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
965 *
966 * @returns The chosen worker node key
967 */
968 private chooseWorkerNode (): number {
969 if (this.shallCreateDynamicWorker()) {
970 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
971 if (
972 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
973 ) {
974 return workerNodeKey
975 }
976 }
977 return this.workerChoiceStrategyContext.execute()
978 }
979
980 /**
981 * Conditions for dynamic worker creation.
982 *
983 * @returns Whether to create a dynamic worker or not.
984 */
985 private shallCreateDynamicWorker (): boolean {
986 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
987 }
988
989 /**
990 * Sends a message to worker given its worker node key.
991 *
992 * @param workerNodeKey - The worker node key.
993 * @param message - The message.
994 * @param transferList - The optional array of transferable objects.
995 */
996 protected abstract sendToWorker (
997 workerNodeKey: number,
998 message: MessageValue<Data>,
999 transferList?: TransferListItem[]
1000 ): void
1001
1002 /**
1003 * Creates a new worker.
1004 *
1005 * @returns Newly created worker.
1006 */
1007 protected abstract createWorker (): Worker
1008
1009 /**
1010 * Creates a new, completely set up worker node.
1011 *
1012 * @returns New, completely set up worker node key.
1013 */
1014 protected createAndSetupWorkerNode (): number {
1015 const worker = this.createWorker()
1016
1017 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
1018 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
1019 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1020 worker.on('error', (error) => {
1021 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1022 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1023 workerInfo.ready = false
1024 this.workerNodes[workerNodeKey].closeChannel()
1025 this.emitter?.emit(PoolEvents.error, error)
1026 if (this.opts.restartWorkerOnError === true && !this.starting) {
1027 if (workerInfo.dynamic) {
1028 this.createAndSetupDynamicWorkerNode()
1029 } else {
1030 this.createAndSetupWorkerNode()
1031 }
1032 }
1033 if (this.opts.enableTasksQueue === true) {
1034 this.redistributeQueuedTasks(workerNodeKey)
1035 }
1036 })
1037 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
1038 worker.once('exit', () => {
1039 this.removeWorkerNode(worker)
1040 })
1041
1042 const workerNodeKey = this.addWorkerNode(worker)
1043
1044 this.afterWorkerNodeSetup(workerNodeKey)
1045
1046 return workerNodeKey
1047 }
1048
1049 /**
1050 * Creates a new, completely set up dynamic worker node.
1051 *
1052 * @returns New, completely set up dynamic worker node key.
1053 */
1054 protected createAndSetupDynamicWorkerNode (): number {
1055 const workerNodeKey = this.createAndSetupWorkerNode()
1056 this.registerWorkerMessageListener(workerNodeKey, (message) => {
1057 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1058 message.workerId
1059 )
1060 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
1061 // Kill message received from worker
1062 if (
1063 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1064 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
1065 ((this.opts.enableTasksQueue === false &&
1066 workerUsage.tasks.executing === 0) ||
1067 (this.opts.enableTasksQueue === true &&
1068 workerUsage.tasks.executing === 0 &&
1069 this.tasksQueueSize(localWorkerNodeKey) === 0)))
1070 ) {
1071 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1072 this.emitter?.emit(PoolEvents.error, error)
1073 })
1074 }
1075 })
1076 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
1077 this.sendToWorker(workerNodeKey, {
1078 checkActive: true,
1079 workerId: workerInfo.id as number
1080 })
1081 workerInfo.dynamic = true
1082 if (
1083 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1084 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1085 ) {
1086 workerInfo.ready = true
1087 }
1088 this.checkAndEmitDynamicWorkerCreationEvents()
1089 return workerNodeKey
1090 }
1091
1092 /**
1093 * Registers a listener callback on the worker given its worker node key.
1094 *
1095 * @param workerNodeKey - The worker node key.
1096 * @param listener - The message listener callback.
1097 */
1098 protected abstract registerWorkerMessageListener<
1099 Message extends Data | Response
1100 >(
1101 workerNodeKey: number,
1102 listener: (message: MessageValue<Message>) => void
1103 ): void
1104
1105 /**
1106 * Method hooked up after a worker node has been newly created.
1107 * Can be overridden.
1108 *
1109 * @param workerNodeKey - The newly created worker node key.
1110 */
1111 protected afterWorkerNodeSetup (workerNodeKey: number): void {
1112 // Listen to worker messages.
1113 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
1114 // Send the startup message to worker.
1115 this.sendStartupMessageToWorker(workerNodeKey)
1116 // Send the statistics message to worker.
1117 this.sendStatisticsMessageToWorker(workerNodeKey)
1118 }
1119
1120 /**
1121 * Sends the startup message to worker given its worker node key.
1122 *
1123 * @param workerNodeKey - The worker node key.
1124 */
1125 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1126
1127 /**
1128 * Sends the statistics message to worker given its worker node key.
1129 *
1130 * @param workerNodeKey - The worker node key.
1131 */
1132 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
1133 this.sendToWorker(workerNodeKey, {
1134 statistics: {
1135 runTime:
1136 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1137 .runTime.aggregate,
1138 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1139 .elu.aggregate
1140 },
1141 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
1142 })
1143 }
1144
1145 private redistributeQueuedTasks (workerNodeKey: number): void {
1146 while (this.tasksQueueSize(workerNodeKey) > 0) {
1147 let targetWorkerNodeKey: number = workerNodeKey
1148 let minQueuedTasks = Infinity
1149 let executeTask = false
1150 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1151 const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
1152 if (
1153 workerNodeId !== workerNodeKey &&
1154 workerInfo.ready &&
1155 workerNode.usage.tasks.queued === 0
1156 ) {
1157 if (
1158 this.workerNodes[workerNodeId].usage.tasks.executing <
1159 (this.opts.tasksQueueOptions?.concurrency as number)
1160 ) {
1161 executeTask = true
1162 }
1163 targetWorkerNodeKey = workerNodeId
1164 break
1165 }
1166 if (
1167 workerNodeId !== workerNodeKey &&
1168 workerInfo.ready &&
1169 workerNode.usage.tasks.queued < minQueuedTasks
1170 ) {
1171 minQueuedTasks = workerNode.usage.tasks.queued
1172 targetWorkerNodeKey = workerNodeId
1173 }
1174 }
1175 if (executeTask) {
1176 this.executeTask(
1177 targetWorkerNodeKey,
1178 this.dequeueTask(workerNodeKey) as Task<Data>
1179 )
1180 } else {
1181 this.enqueueTask(
1182 targetWorkerNodeKey,
1183 this.dequeueTask(workerNodeKey) as Task<Data>
1184 )
1185 }
1186 }
1187 }
1188
1189 /**
1190 * This method is the listener registered for each worker message.
1191 *
1192 * @returns The listener function to execute when a message is received from a worker.
1193 */
1194 protected workerListener (): (message: MessageValue<Response>) => void {
1195 return (message) => {
1196 this.checkMessageWorkerId(message)
1197 if (message.ready != null && message.taskFunctions != null) {
1198 // Worker ready response received from worker
1199 this.handleWorkerReadyResponse(message)
1200 } else if (message.taskId != null) {
1201 // Task execution response received from worker
1202 this.handleTaskExecutionResponse(message)
1203 } else if (message.taskFunctions != null) {
1204 // Task functions message received from worker
1205 (
1206 this.getWorkerInfo(
1207 this.getWorkerNodeKeyByWorkerId(message.workerId)
1208 ) as WorkerInfo
1209 ).taskFunctions = message.taskFunctions
1210 }
1211 }
1212 }
1213
1214 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1215 if (message.ready === false) {
1216 throw new Error(`Worker ${message.workerId} failed to initialize`)
1217 }
1218 const workerInfo = this.getWorkerInfo(
1219 this.getWorkerNodeKeyByWorkerId(message.workerId)
1220 ) as WorkerInfo
1221 workerInfo.ready = message.ready as boolean
1222 workerInfo.taskFunctions = message.taskFunctions
1223 if (this.emitter != null && this.ready) {
1224 this.emitter.emit(PoolEvents.ready, this.info)
1225 }
1226 }
1227
1228 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1229 const { taskId, taskError, data } = message
1230 const promiseResponse = this.promiseResponseMap.get(taskId as string)
1231 if (promiseResponse != null) {
1232 if (taskError != null) {
1233 this.emitter?.emit(PoolEvents.taskError, taskError)
1234 promiseResponse.reject(taskError.message)
1235 } else {
1236 promiseResponse.resolve(data as Response)
1237 }
1238 const workerNodeKey = promiseResponse.workerNodeKey
1239 this.afterTaskExecutionHook(workerNodeKey, message)
1240 this.promiseResponseMap.delete(taskId as string)
1241 if (
1242 this.opts.enableTasksQueue === true &&
1243 this.tasksQueueSize(workerNodeKey) > 0 &&
1244 this.workerNodes[workerNodeKey].usage.tasks.executing <
1245 (this.opts.tasksQueueOptions?.concurrency as number)
1246 ) {
1247 this.executeTask(
1248 workerNodeKey,
1249 this.dequeueTask(workerNodeKey) as Task<Data>
1250 )
1251 }
1252 this.workerChoiceStrategyContext.update(workerNodeKey)
1253 }
1254 }
1255
1256 private checkAndEmitTaskExecutionEvents (): void {
1257 if (this.busy) {
1258 this.emitter?.emit(PoolEvents.busy, this.info)
1259 }
1260 }
1261
1262 private checkAndEmitTaskQueuingEvents (): void {
1263 if (this.hasBackPressure()) {
1264 this.emitter?.emit(PoolEvents.backPressure, this.info)
1265 }
1266 }
1267
1268 private checkAndEmitDynamicWorkerCreationEvents (): void {
1269 if (this.type === PoolTypes.dynamic) {
1270 if (this.full) {
1271 this.emitter?.emit(PoolEvents.full, this.info)
1272 }
1273 }
1274 }
1275
1276 /**
1277 * Gets the worker information given its worker node key.
1278 *
1279 * @param workerNodeKey - The worker node key.
1280 * @returns The worker information.
1281 */
1282 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1283 return this.workerNodes[workerNodeKey]?.info
1284 }
1285
1286 /**
1287 * Adds the given worker in the pool worker nodes.
1288 *
1289 * @param worker - The worker.
1290 * @returns The added worker node key.
1291 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1292 */
1293 private addWorkerNode (worker: Worker): number {
1294 const workerNode = new WorkerNode<Worker, Data>(
1295 worker,
1296 this.worker,
1297 this.maxSize
1298 )
1299 // Flag the worker node as ready at pool startup.
1300 if (this.starting) {
1301 workerNode.info.ready = true
1302 }
1303 this.workerNodes.push(workerNode)
1304 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1305 if (workerNodeKey === -1) {
1306 throw new Error('Worker node added not found')
1307 }
1308 return workerNodeKey
1309 }
1310
1311 /**
1312 * Removes the given worker from the pool worker nodes.
1313 *
1314 * @param worker - The worker.
1315 */
1316 private removeWorkerNode (worker: Worker): void {
1317 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1318 if (workerNodeKey !== -1) {
1319 this.workerNodes.splice(workerNodeKey, 1)
1320 this.workerChoiceStrategyContext.remove(workerNodeKey)
1321 }
1322 }
1323
1324 /** @inheritDoc */
1325 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
1326 return (
1327 this.opts.enableTasksQueue === true &&
1328 this.workerNodes[workerNodeKey].hasBackPressure()
1329 )
1330 }
1331
1332 private hasBackPressure (): boolean {
1333 return (
1334 this.opts.enableTasksQueue === true &&
1335 this.workerNodes.findIndex(
1336 (workerNode) => !workerNode.hasBackPressure()
1337 ) === -1
1338 )
1339 }
1340
1341 /**
1342 * Executes the given task on the worker given its worker node key.
1343 *
1344 * @param workerNodeKey - The worker node key.
1345 * @param task - The task to execute.
1346 */
1347 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1348 this.beforeTaskExecutionHook(workerNodeKey, task)
1349 this.sendToWorker(workerNodeKey, task, task.transferList)
1350 this.checkAndEmitTaskExecutionEvents()
1351 }
1352
1353 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1354 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1355 this.checkAndEmitTaskQueuingEvents()
1356 return tasksQueueSize
1357 }
1358
1359 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1360 return this.workerNodes[workerNodeKey].dequeueTask()
1361 }
1362
1363 private tasksQueueSize (workerNodeKey: number): number {
1364 return this.workerNodes[workerNodeKey].tasksQueueSize()
1365 }
1366
1367 protected flushTasksQueue (workerNodeKey: number): void {
1368 while (this.tasksQueueSize(workerNodeKey) > 0) {
1369 this.executeTask(
1370 workerNodeKey,
1371 this.dequeueTask(workerNodeKey) as Task<Data>
1372 )
1373 }
1374 this.workerNodes[workerNodeKey].clearTasksQueue()
1375 }
1376
1377 private flushTasksQueues (): void {
1378 for (const [workerNodeKey] of this.workerNodes.entries()) {
1379 this.flushTasksQueue(workerNodeKey)
1380 }
1381 }
1382 }