fix: close communication channel on error
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 type MeasurementStatisticsRequirements,
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
78
79 /**
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
83 Worker,
84 Data,
85 Response
86 >
87
88 /**
89 * Whether the pool is starting or not.
90 */
91 private readonly starting: boolean
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
97 /**
98 * Constructs a new poolifier pool.
99 *
100 * @param numberOfWorkers - Number of workers that this pool should manage.
101 * @param filePath - Path to the worker file.
102 * @param opts - Options for the pool.
103 */
104 public constructor (
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
108 ) {
109 if (!this.isMain()) {
110 throw new Error('Cannot start a pool from a worker!')
111 }
112 this.checkNumberOfWorkers(this.numberOfWorkers)
113 this.checkFilePath(this.filePath)
114 this.checkPoolOptions(this.opts)
115
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
180 } else if (max === 0) {
181 throw new RangeError(
182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
189 }
190 }
191
192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
216 }
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
223 throw new Error(
224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
225 )
226 }
227 }
228
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
237 if (
238 workerChoiceStrategyOptions.weights != null &&
239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
255 }
256
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
275 throw new Error(
276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
277 )
278 }
279 }
280
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
293 /** @inheritDoc */
294 public get info (): PoolInfo {
295 return {
296 version,
297 type: this.type,
298 worker: this.worker,
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
301 minSize: this.minSize,
302 maxSize: this.maxSize,
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 workerNode.usage.tasks.executing === 0
311 ? accumulator + 1
312 : accumulator,
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
318 0
319 ),
320 executedTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.executed,
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.executing,
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + workerNode.usage.tasks.queued,
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
338 0
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.failed,
343 0
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
353 )
354 ),
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
360 )
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
394 )
395 ),
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
401 )
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
425 }
426 })
427 }
428 }
429
430 private get ready (): boolean {
431 return (
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
439 )
440 }
441
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
448 const poolTimeCapacity =
449 (performance.now() - this.startTimestamp) * this.maxSize
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
458 0
459 )
460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
461 }
462
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
475 /**
476 * Pool minimum size.
477 */
478 protected abstract get minSize (): number
479
480 /**
481 * Pool maximum size.
482 */
483 protected abstract get maxSize (): number
484
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
513 /**
514 * Gets the given worker its worker node key.
515 *
516 * @param worker - The worker.
517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
518 */
519 protected getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
529 ): void {
530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
531 this.opts.workerChoiceStrategy = workerChoiceStrategy
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
538 for (const workerNode of this.workerNodes) {
539 workerNode.resetUsage()
540 this.setWorkerStatistics(workerNode.worker)
541 }
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
552 )
553 }
554
555 /** @inheritDoc */
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
560 if (this.opts.enableTasksQueue === true && !enable) {
561 this.flushTasksQueues()
562 }
563 this.opts.enableTasksQueue = enable
564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
565 }
566
567 /** @inheritDoc */
568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
569 if (this.opts.enableTasksQueue === true) {
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
573 } else if (this.opts.tasksQueueOptions != null) {
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
594
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
601
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
607 protected internalBusy (): boolean {
608 return (
609 this.workerNodes.findIndex(workerNode => {
610 return workerNode.usage.tasks.executing === 0
611 }) === -1
612 )
613 }
614
615 /** @inheritDoc */
616 public async execute (data?: Data, name?: string): Promise<Response> {
617 const timestamp = performance.now()
618 const workerNodeKey = this.chooseWorkerNode()
619 const submittedTask: Task<Data> = {
620 name: name ?? DEFAULT_TASK_NAME,
621 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
622 data: data ?? ({} as Data),
623 timestamp,
624 workerId: this.getWorkerInfo(workerNodeKey).id as number,
625 id: randomUUID()
626 }
627 const res = new Promise<Response>((resolve, reject) => {
628 this.promiseResponseMap.set(submittedTask.id as string, {
629 resolve,
630 reject,
631 worker: this.workerNodes[workerNodeKey].worker
632 })
633 })
634 if (
635 this.opts.enableTasksQueue === true &&
636 (this.busy ||
637 this.workerNodes[workerNodeKey].usage.tasks.executing >=
638 ((this.opts.tasksQueueOptions as TasksQueueOptions)
639 .concurrency as number))
640 ) {
641 this.enqueueTask(workerNodeKey, submittedTask)
642 } else {
643 this.executeTask(workerNodeKey, submittedTask)
644 }
645 this.checkAndEmitEvents()
646 // eslint-disable-next-line @typescript-eslint/return-await
647 return res
648 }
649
650 /** @inheritDoc */
651 public async destroy (): Promise<void> {
652 await Promise.all(
653 this.workerNodes.map(async (workerNode, workerNodeKey) => {
654 this.flushTasksQueue(workerNodeKey)
655 // FIXME: wait for tasks to be finished
656 const workerExitPromise = new Promise<void>(resolve => {
657 workerNode.worker.on('exit', () => {
658 resolve()
659 })
660 })
661 await this.destroyWorker(workerNode.worker)
662 await workerExitPromise
663 })
664 )
665 }
666
667 /**
668 * Terminates the given worker.
669 *
670 * @param worker - A worker within `workerNodes`.
671 */
672 protected abstract destroyWorker (worker: Worker): void | Promise<void>
673
674 /**
675 * Setup hook to execute code before worker nodes are created in the abstract constructor.
676 * Can be overridden.
677 *
678 * @virtual
679 */
680 protected setupHook (): void {
681 // Intentionally empty
682 }
683
684 /**
685 * Should return whether the worker is the main worker or not.
686 */
687 protected abstract isMain (): boolean
688
689 /**
690 * Hook executed before the worker task execution.
691 * Can be overridden.
692 *
693 * @param workerNodeKey - The worker node key.
694 * @param task - The task to execute.
695 */
696 protected beforeTaskExecutionHook (
697 workerNodeKey: number,
698 task: Task<Data>
699 ): void {
700 const workerUsage = this.workerNodes[workerNodeKey].usage
701 ++workerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(workerUsage, task)
703 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
704 task.name as string
705 ) as WorkerUsage
706 ++taskWorkerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
708 }
709
710 /**
711 * Hook executed after the worker task execution.
712 * Can be overridden.
713 *
714 * @param worker - The worker.
715 * @param message - The received message.
716 */
717 protected afterTaskExecutionHook (
718 worker: Worker,
719 message: MessageValue<Response>
720 ): void {
721 const workerNodeKey = this.getWorkerNodeKey(worker)
722 const workerUsage = this.workerNodes[workerNodeKey].usage
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
728 ) as WorkerUsage
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
743 ++workerTaskStatistics.failed
744 }
745 }
746
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
749 message: MessageValue<Response>
750 ): void {
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
757 }
758
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
761 task: Task<Data>
762 ): void {
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
771 }
772
773 private updateEluWorkerUsage (
774 workerUsage: WorkerUsage,
775 message: MessageValue<Response>
776 ): void {
777 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
779 updateMeasurementStatistics(
780 workerUsage.elu.active,
781 eluTaskStatisticsRequirements,
782 message.taskPerformance?.elu?.active ?? 0,
783 workerUsage.tasks.executed
784 )
785 updateMeasurementStatistics(
786 workerUsage.elu.idle,
787 eluTaskStatisticsRequirements,
788 message.taskPerformance?.elu?.idle ?? 0,
789 workerUsage.tasks.executed
790 )
791 if (eluTaskStatisticsRequirements.aggregate) {
792 if (message.taskPerformance?.elu != null) {
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
801 }
802 }
803 }
804
805 /**
806 * Chooses a worker node for the next task.
807 *
808 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
809 *
810 * @returns The worker node key
811 */
812 private chooseWorkerNode (): number {
813 if (this.shallCreateDynamicWorker()) {
814 const worker = this.createAndSetupDynamicWorker()
815 if (
816 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
817 ) {
818 return this.getWorkerNodeKey(worker)
819 }
820 }
821 return this.workerChoiceStrategyContext.execute()
822 }
823
824 /**
825 * Conditions for dynamic worker creation.
826 *
827 * @returns Whether to create a dynamic worker or not.
828 */
829 private shallCreateDynamicWorker (): boolean {
830 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
831 }
832
833 /**
834 * Sends a message to the given worker.
835 *
836 * @param worker - The worker which should receive the message.
837 * @param message - The message.
838 */
839 protected abstract sendToWorker (
840 worker: Worker,
841 message: MessageValue<Data>
842 ): void
843
844 /**
845 * Creates a new worker.
846 *
847 * @returns Newly created worker.
848 */
849 protected abstract createWorker (): Worker
850
851 /**
852 * Creates a new worker and sets it up completely in the pool worker nodes.
853 *
854 * @returns New, completely set up worker.
855 */
856 protected createAndSetupWorker (): Worker {
857 const worker = this.createWorker()
858
859 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
860 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
861 worker.on('error', error => {
862 const workerNodeKey = this.getWorkerNodeKey(worker)
863 const workerInfo = this.getWorkerInfo(workerNodeKey)
864 workerInfo.ready = false
865 this.workerNodes[workerNodeKey].closeChannel()
866 this.emitter?.emit(PoolEvents.error, error)
867 if (this.opts.restartWorkerOnError === true && !this.starting) {
868 if (workerInfo.dynamic) {
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
873 }
874 if (this.opts.enableTasksQueue === true) {
875 this.redistributeQueuedTasks(workerNodeKey)
876 }
877 })
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
880 worker.once('exit', () => {
881 this.removeWorkerNode(worker)
882 })
883
884 this.addWorkerNode(worker)
885
886 this.afterWorkerSetup(worker)
887
888 return worker
889 }
890
891 /**
892 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
893 *
894 * @returns New, completely set up dynamic worker.
895 */
896 protected createAndSetupDynamicWorker (): Worker {
897 const worker = this.createAndSetupWorker()
898 this.registerWorkerMessageListener(worker, message => {
899 const workerNodeKey = this.getWorkerNodeKey(worker)
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
905 (this.opts.enableTasksQueue === true &&
906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
907 this.tasksQueueSize(workerNodeKey) === 0)))
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
913 const workerInfo = this.getWorkerInfoByWorker(worker)
914 workerInfo.dynamic = true
915 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
916 workerInfo.ready = true
917 }
918 this.sendToWorker(worker, {
919 checkActive: true,
920 workerId: workerInfo.id as number
921 })
922 return worker
923 }
924
925 /**
926 * Registers a listener callback on the given worker.
927 *
928 * @param worker - The worker which should register a listener.
929 * @param listener - The message listener callback.
930 */
931 protected abstract registerWorkerMessageListener<
932 Message extends Data | Response
933 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
934
935 /**
936 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
937 * Can be overridden.
938 *
939 * @param worker - The newly created worker.
940 */
941 protected afterWorkerSetup (worker: Worker): void {
942 // Listen to worker messages.
943 this.registerWorkerMessageListener(worker, this.workerListener())
944 // Send the startup message to worker.
945 this.sendStartupMessageToWorker(worker)
946 // Setup worker task statistics computation.
947 this.setWorkerStatistics(worker)
948 }
949
950 /**
951 * Sends the startup message to the given worker.
952 *
953 * @param worker - The worker which should receive the startup message.
954 */
955 protected abstract sendStartupMessageToWorker (worker: Worker): void
956
957 private redistributeQueuedTasks (workerNodeKey: number): void {
958 while (this.tasksQueueSize(workerNodeKey) > 0) {
959 let targetWorkerNodeKey: number = workerNodeKey
960 let minQueuedTasks = Infinity
961 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
962 const workerInfo = this.getWorkerInfo(workerNodeId)
963 if (
964 workerNodeId !== workerNodeKey &&
965 workerInfo.ready &&
966 workerNode.usage.tasks.queued === 0
967 ) {
968 targetWorkerNodeKey = workerNodeId
969 break
970 }
971 if (
972 workerNodeId !== workerNodeKey &&
973 workerInfo.ready &&
974 workerNode.usage.tasks.queued < minQueuedTasks
975 ) {
976 minQueuedTasks = workerNode.usage.tasks.queued
977 targetWorkerNodeKey = workerNodeId
978 }
979 }
980 this.enqueueTask(
981 targetWorkerNodeKey,
982 this.dequeueTask(workerNodeKey) as Task<Data>
983 )
984 }
985 }
986
987 /**
988 * This function is the listener registered for each worker message.
989 *
990 * @returns The listener function to execute when a message is received from a worker.
991 */
992 protected workerListener (): (message: MessageValue<Response>) => void {
993 return message => {
994 this.checkMessageWorkerId(message)
995 if (message.ready != null) {
996 // Worker ready response received
997 this.handleWorkerReadyResponse(message)
998 } else if (message.id != null) {
999 // Task execution response received
1000 this.handleTaskExecutionResponse(message)
1001 }
1002 }
1003 }
1004
1005 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1006 this.getWorkerInfoByWorker(
1007 this.getWorkerById(message.workerId) as Worker
1008 ).ready = message.ready as boolean
1009 if (this.emitter != null && this.ready) {
1010 this.emitter.emit(PoolEvents.ready, this.info)
1011 }
1012 }
1013
1014 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1015 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1016 if (promiseResponse != null) {
1017 if (message.taskError != null) {
1018 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1019 promiseResponse.reject(message.taskError.message)
1020 } else {
1021 promiseResponse.resolve(message.data as Response)
1022 }
1023 this.afterTaskExecutionHook(promiseResponse.worker, message)
1024 this.promiseResponseMap.delete(message.id as string)
1025 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1026 if (
1027 this.opts.enableTasksQueue === true &&
1028 this.tasksQueueSize(workerNodeKey) > 0
1029 ) {
1030 this.executeTask(
1031 workerNodeKey,
1032 this.dequeueTask(workerNodeKey) as Task<Data>
1033 )
1034 }
1035 this.workerChoiceStrategyContext.update(workerNodeKey)
1036 }
1037 }
1038
1039 private checkAndEmitEvents (): void {
1040 if (this.emitter != null) {
1041 if (this.busy) {
1042 this.emitter.emit(PoolEvents.busy, this.info)
1043 }
1044 if (this.type === PoolTypes.dynamic && this.full) {
1045 this.emitter.emit(PoolEvents.full, this.info)
1046 }
1047 }
1048 }
1049
1050 /**
1051 * Gets the worker information from the given worker node key.
1052 *
1053 * @param workerNodeKey - The worker node key.
1054 * @returns The worker information.
1055 */
1056 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1057 return this.workerNodes[workerNodeKey].info
1058 }
1059
1060 /**
1061 * Gets the worker information from the given worker.
1062 *
1063 * @param worker - The worker.
1064 * @returns The worker information.
1065 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
1066 */
1067 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
1068 const workerNodeKey = this.getWorkerNodeKey(worker)
1069 if (workerNodeKey === -1) {
1070 throw new Error('Worker not found')
1071 }
1072 return this.workerNodes[workerNodeKey].info
1073 }
1074
1075 /**
1076 * Adds the given worker in the pool worker nodes.
1077 *
1078 * @param worker - The worker.
1079 * @returns The worker nodes length.
1080 */
1081 private addWorkerNode (worker: Worker): number {
1082 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1083 // Flag the worker node as ready at pool startup.
1084 if (this.starting) {
1085 workerNode.info.ready = true
1086 }
1087 return this.workerNodes.push(workerNode)
1088 }
1089
1090 /**
1091 * Removes the given worker from the pool worker nodes.
1092 *
1093 * @param worker - The worker.
1094 */
1095 private removeWorkerNode (worker: Worker): void {
1096 const workerNodeKey = this.getWorkerNodeKey(worker)
1097 if (workerNodeKey !== -1) {
1098 this.workerNodes.splice(workerNodeKey, 1)
1099 this.workerChoiceStrategyContext.remove(workerNodeKey)
1100 }
1101 }
1102
1103 /**
1104 * Executes the given task on the given worker.
1105 *
1106 * @param worker - The worker.
1107 * @param task - The task to execute.
1108 */
1109 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1110 this.beforeTaskExecutionHook(workerNodeKey, task)
1111 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1112 }
1113
1114 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1115 return this.workerNodes[workerNodeKey].enqueueTask(task)
1116 }
1117
1118 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1119 return this.workerNodes[workerNodeKey].dequeueTask()
1120 }
1121
1122 private tasksQueueSize (workerNodeKey: number): number {
1123 return this.workerNodes[workerNodeKey].tasksQueueSize()
1124 }
1125
1126 private flushTasksQueue (workerNodeKey: number): void {
1127 while (this.tasksQueueSize(workerNodeKey) > 0) {
1128 this.executeTask(
1129 workerNodeKey,
1130 this.dequeueTask(workerNodeKey) as Task<Data>
1131 )
1132 }
1133 this.workerNodes[workerNodeKey].clearTasksQueue()
1134 }
1135
1136 private flushTasksQueues (): void {
1137 for (const [workerNodeKey] of this.workerNodes.entries()) {
1138 this.flushTasksQueue(workerNodeKey)
1139 }
1140 }
1141
1142 private setWorkerStatistics (worker: Worker): void {
1143 this.sendToWorker(worker, {
1144 statistics: {
1145 runTime:
1146 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1147 .runTime.aggregate,
1148 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1149 .elu.aggregate
1150 },
1151 workerId: this.getWorkerInfoByWorker(worker).id as number
1152 })
1153 }
1154 }