56dd7fa9296265078f863d342338da0457309919
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.dequeueTask = this.dequeueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (max == null) {
177 throw new Error(
178 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
179 )
180 } else if (!Number.isSafeInteger(max)) {
181 throw new TypeError(
182 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 )
184 } else if (min > max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
187 )
188 } else if (max === 0) {
189 throw new RangeError(
190 'Cannot instantiate a dynamic pool with a pool size equal to zero'
191 )
192 } else if (min === max) {
193 throw new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 }
197 }
198 }
199
200 private checkPoolOptions (opts: PoolOptions<Worker>): void {
201 if (isPlainObject(opts)) {
202 this.opts.workerChoiceStrategy =
203 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
204 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
205 this.opts.workerChoiceStrategyOptions =
206 opts.workerChoiceStrategyOptions ??
207 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
208 this.checkValidWorkerChoiceStrategyOptions(
209 this.opts.workerChoiceStrategyOptions
210 )
211 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
212 this.opts.enableEvents = opts.enableEvents ?? true
213 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
214 if (this.opts.enableTasksQueue) {
215 this.checkValidTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 }
222 } else {
223 throw new TypeError('Invalid pool options: must be a plain object')
224 }
225 }
226
227 private checkValidWorkerChoiceStrategy (
228 workerChoiceStrategy: WorkerChoiceStrategy
229 ): void {
230 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
231 throw new Error(
232 `Invalid worker choice strategy '${workerChoiceStrategy}'`
233 )
234 }
235 }
236
237 private checkValidWorkerChoiceStrategyOptions (
238 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
239 ): void {
240 if (!isPlainObject(workerChoiceStrategyOptions)) {
241 throw new TypeError(
242 'Invalid worker choice strategy options: must be a plain object'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.weights != null &&
247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
263 }
264
265 private checkValidTasksQueueOptions (
266 tasksQueueOptions: TasksQueueOptions
267 ): void {
268 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
269 throw new TypeError('Invalid tasks queue options: must be a plain object')
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 !Number.isSafeInteger(tasksQueueOptions.concurrency)
274 ) {
275 throw new TypeError(
276 'Invalid worker tasks concurrency: must be an integer'
277 )
278 }
279 if (
280 tasksQueueOptions?.concurrency != null &&
281 tasksQueueOptions.concurrency <= 0
282 ) {
283 throw new Error(
284 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
285 )
286 }
287 }
288
289 private startPool (): void {
290 while (
291 this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
294 0
295 ) < this.numberOfWorkers
296 ) {
297 this.createAndSetupWorkerNode()
298 }
299 }
300
301 /** @inheritDoc */
302 public get info (): PoolInfo {
303 return {
304 version,
305 type: this.type,
306 worker: this.worker,
307 ready: this.ready,
308 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
309 minSize: this.minSize,
310 maxSize: this.maxSize,
311 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
312 .runTime.aggregate &&
313 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .waitTime.aggregate && { utilization: round(this.utilization) }),
315 workerNodes: this.workerNodes.length,
316 idleWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 workerNode.usage.tasks.executing === 0
319 ? accumulator + 1
320 : accumulator,
321 0
322 ),
323 busyWorkerNodes: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
326 0
327 ),
328 executedTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.executed,
331 0
332 ),
333 executingTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.executing,
336 0
337 ),
338 queuedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.queued,
341 0
342 ),
343 maxQueuedTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
346 0
347 ),
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + workerNode.usage.tasks.failed,
351 0
352 ),
353 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 .runTime.aggregate && {
355 runTime: {
356 minimum: round(
357 Math.min(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
360 )
361 )
362 ),
363 maximum: round(
364 Math.max(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
367 )
368 )
369 ),
370 average: round(
371 this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
374 0
375 ) /
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.tasks?.executed ?? 0),
379 0
380 )
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.median && {
384 median: round(
385 median(
386 this.workerNodes.map(
387 workerNode => workerNode.usage.runTime?.median ?? 0
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 Math.min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 Math.max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 average: round(
412 this.workerNodes.reduce(
413 (accumulator, workerNode) =>
414 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
415 0
416 ) /
417 this.workerNodes.reduce(
418 (accumulator, workerNode) =>
419 accumulator + (workerNode.usage.tasks?.executed ?? 0),
420 0
421 )
422 ),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.map(
428 workerNode => workerNode.usage.waitTime?.median ?? 0
429 )
430 )
431 )
432 })
433 }
434 })
435 }
436 }
437
438 /**
439 * The pool readiness boolean status.
440 */
441 private get ready (): boolean {
442 return (
443 this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 !workerNode.info.dynamic && workerNode.info.ready
446 ? accumulator + 1
447 : accumulator,
448 0
449 ) >= this.minSize
450 )
451 }
452
453 /**
454 * The approximate pool utilization.
455 *
456 * @returns The pool utilization.
457 */
458 private get utilization (): number {
459 const poolTimeCapacity =
460 (performance.now() - this.startTimestamp) * this.maxSize
461 const totalTasksRunTime = this.workerNodes.reduce(
462 (accumulator, workerNode) =>
463 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
464 0
465 )
466 const totalTasksWaitTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
469 0
470 )
471 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
472 }
473
474 /**
475 * The pool type.
476 *
477 * If it is `'dynamic'`, it provides the `max` property.
478 */
479 protected abstract get type (): PoolType
480
481 /**
482 * The worker type.
483 */
484 protected abstract get worker (): WorkerType
485
486 /**
487 * The pool minimum size.
488 */
489 protected abstract get minSize (): number
490
491 /**
492 * The pool maximum size.
493 */
494 protected abstract get maxSize (): number
495
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
513 /**
514 * Gets the given worker its worker node key.
515 *
516 * @param worker - The worker.
517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKeyByWorker (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
523 }
524
525 /**
526 * Gets the worker node key given its worker id.
527 *
528 * @param workerId - The worker id.
529 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
530 */
531 private getWorkerNodeKeyByWorkerId (workerId: number): number {
532 return this.workerNodes.findIndex(
533 workerNode => workerNode.info.id === workerId
534 )
535 }
536
537 /** @inheritDoc */
538 public setWorkerChoiceStrategy (
539 workerChoiceStrategy: WorkerChoiceStrategy,
540 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
541 ): void {
542 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
543 this.opts.workerChoiceStrategy = workerChoiceStrategy
544 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
545 this.opts.workerChoiceStrategy
546 )
547 if (workerChoiceStrategyOptions != null) {
548 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 }
550 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
551 workerNode.resetUsage()
552 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
553 }
554 }
555
556 /** @inheritDoc */
557 public setWorkerChoiceStrategyOptions (
558 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
559 ): void {
560 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
561 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
562 this.workerChoiceStrategyContext.setOptions(
563 this.opts.workerChoiceStrategyOptions
564 )
565 }
566
567 /** @inheritDoc */
568 public enableTasksQueue (
569 enable: boolean,
570 tasksQueueOptions?: TasksQueueOptions
571 ): void {
572 if (this.opts.enableTasksQueue === true && !enable) {
573 this.flushTasksQueues()
574 }
575 this.opts.enableTasksQueue = enable
576 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
577 }
578
579 /** @inheritDoc */
580 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
581 if (this.opts.enableTasksQueue === true) {
582 this.checkValidTasksQueueOptions(tasksQueueOptions)
583 this.opts.tasksQueueOptions =
584 this.buildTasksQueueOptions(tasksQueueOptions)
585 } else if (this.opts.tasksQueueOptions != null) {
586 delete this.opts.tasksQueueOptions
587 }
588 }
589
590 private buildTasksQueueOptions (
591 tasksQueueOptions: TasksQueueOptions
592 ): TasksQueueOptions {
593 return {
594 concurrency: tasksQueueOptions?.concurrency ?? 1
595 }
596 }
597
598 /**
599 * Whether the pool is full or not.
600 *
601 * The pool filling boolean status.
602 */
603 protected get full (): boolean {
604 return this.workerNodes.length >= this.maxSize
605 }
606
607 /**
608 * Whether the pool is busy or not.
609 *
610 * The pool busyness boolean status.
611 */
612 protected abstract get busy (): boolean
613
614 /**
615 * Whether worker nodes are executing at least one task.
616 *
617 * @returns Worker nodes busyness boolean status.
618 */
619 protected internalBusy (): boolean {
620 return (
621 this.workerNodes.findIndex(workerNode => {
622 return workerNode.usage.tasks.executing === 0
623 }) === -1
624 )
625 }
626
627 /** @inheritDoc */
628 public async execute (data?: Data, name?: string): Promise<Response> {
629 return await new Promise<Response>((resolve, reject) => {
630 const timestamp = performance.now()
631 const workerNodeKey = this.chooseWorkerNode()
632 const task: Task<Data> = {
633 name: name ?? DEFAULT_TASK_NAME,
634 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
635 data: data ?? ({} as Data),
636 timestamp,
637 workerId: this.getWorkerInfo(workerNodeKey).id as number,
638 id: randomUUID()
639 }
640 this.promiseResponseMap.set(task.id as string, {
641 resolve,
642 reject,
643 workerNodeKey
644 })
645 if (
646 this.opts.enableTasksQueue === true &&
647 (this.busy ||
648 this.workerNodes[workerNodeKey].usage.tasks.executing >=
649 (this.opts.tasksQueueOptions?.concurrency as number))
650 ) {
651 this.enqueueTask(workerNodeKey, task)
652 } else {
653 this.executeTask(workerNodeKey, task)
654 }
655 this.checkAndEmitEvents()
656 })
657 }
658
659 /** @inheritDoc */
660 public async destroy (): Promise<void> {
661 await Promise.all(
662 this.workerNodes.map(async (workerNode, workerNodeKey) => {
663 this.flushTasksQueue(workerNodeKey)
664 // FIXME: wait for tasks to be finished
665 const workerExitPromise = new Promise<void>(resolve => {
666 workerNode.worker.on('exit', () => {
667 resolve()
668 })
669 })
670 await this.destroyWorkerNode(workerNodeKey)
671 await workerExitPromise
672 })
673 )
674 }
675
676 /**
677 * Terminates the worker node given its worker node key.
678 *
679 * @param workerNodeKey - The worker node key.
680 */
681 protected abstract destroyWorkerNode (
682 workerNodeKey: number
683 ): void | Promise<void>
684
685 /**
686 * Setup hook to execute code before worker nodes are created in the abstract constructor.
687 * Can be overridden.
688 *
689 * @virtual
690 */
691 protected setupHook (): void {
692 // Intentionally empty
693 }
694
695 /**
696 * Should return whether the worker is the main worker or not.
697 */
698 protected abstract isMain (): boolean
699
700 /**
701 * Hook executed before the worker task execution.
702 * Can be overridden.
703 *
704 * @param workerNodeKey - The worker node key.
705 * @param task - The task to execute.
706 */
707 protected beforeTaskExecutionHook (
708 workerNodeKey: number,
709 task: Task<Data>
710 ): void {
711 const workerUsage = this.workerNodes[workerNodeKey].usage
712 ++workerUsage.tasks.executing
713 this.updateWaitTimeWorkerUsage(workerUsage, task)
714 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
715 task.name as string
716 ) as WorkerUsage
717 ++taskWorkerUsage.tasks.executing
718 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
719 }
720
721 /**
722 * Hook executed after the worker task execution.
723 * Can be overridden.
724 *
725 * @param workerNodeKey - The worker node key.
726 * @param message - The received message.
727 */
728 protected afterTaskExecutionHook (
729 workerNodeKey: number,
730 message: MessageValue<Response>
731 ): void {
732 const workerUsage = this.workerNodes[workerNodeKey].usage
733 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
734 this.updateRunTimeWorkerUsage(workerUsage, message)
735 this.updateEluWorkerUsage(workerUsage, message)
736 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
737 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
738 ) as WorkerUsage
739 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
740 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
741 this.updateEluWorkerUsage(taskWorkerUsage, message)
742 }
743
744 private updateTaskStatisticsWorkerUsage (
745 workerUsage: WorkerUsage,
746 message: MessageValue<Response>
747 ): void {
748 const workerTaskStatistics = workerUsage.tasks
749 --workerTaskStatistics.executing
750 if (message.taskError == null) {
751 ++workerTaskStatistics.executed
752 } else {
753 ++workerTaskStatistics.failed
754 }
755 }
756
757 private updateRunTimeWorkerUsage (
758 workerUsage: WorkerUsage,
759 message: MessageValue<Response>
760 ): void {
761 updateMeasurementStatistics(
762 workerUsage.runTime,
763 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
764 message.taskPerformance?.runTime ?? 0,
765 workerUsage.tasks.executed
766 )
767 }
768
769 private updateWaitTimeWorkerUsage (
770 workerUsage: WorkerUsage,
771 task: Task<Data>
772 ): void {
773 const timestamp = performance.now()
774 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
775 updateMeasurementStatistics(
776 workerUsage.waitTime,
777 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
778 taskWaitTime,
779 workerUsage.tasks.executed
780 )
781 }
782
783 private updateEluWorkerUsage (
784 workerUsage: WorkerUsage,
785 message: MessageValue<Response>
786 ): void {
787 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
788 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
789 updateMeasurementStatistics(
790 workerUsage.elu.active,
791 eluTaskStatisticsRequirements,
792 message.taskPerformance?.elu?.active ?? 0,
793 workerUsage.tasks.executed
794 )
795 updateMeasurementStatistics(
796 workerUsage.elu.idle,
797 eluTaskStatisticsRequirements,
798 message.taskPerformance?.elu?.idle ?? 0,
799 workerUsage.tasks.executed
800 )
801 if (eluTaskStatisticsRequirements.aggregate) {
802 if (message.taskPerformance?.elu != null) {
803 if (workerUsage.elu.utilization != null) {
804 workerUsage.elu.utilization =
805 (workerUsage.elu.utilization +
806 message.taskPerformance.elu.utilization) /
807 2
808 } else {
809 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
810 }
811 }
812 }
813 }
814
815 /**
816 * Chooses a worker node for the next task.
817 *
818 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
819 *
820 * @returns The chosen worker node key
821 */
822 private chooseWorkerNode (): number {
823 if (this.shallCreateDynamicWorker()) {
824 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
825 if (
826 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
827 ) {
828 return workerNodeKey
829 }
830 }
831 return this.workerChoiceStrategyContext.execute()
832 }
833
834 /**
835 * Conditions for dynamic worker creation.
836 *
837 * @returns Whether to create a dynamic worker or not.
838 */
839 private shallCreateDynamicWorker (): boolean {
840 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
841 }
842
843 /**
844 * Sends a message to worker given its worker node key.
845 *
846 * @param workerNodeKey - The worker node key.
847 * @param message - The message.
848 */
849 protected abstract sendToWorker (
850 workerNodeKey: number,
851 message: MessageValue<Data>
852 ): void
853
854 /**
855 * Creates a new worker.
856 *
857 * @returns Newly created worker.
858 */
859 protected abstract createWorker (): Worker
860
861 /**
862 * Creates a new, completely set up worker node.
863 *
864 * @returns New, completely set up worker node key.
865 */
866 protected createAndSetupWorkerNode (): number {
867 const worker = this.createWorker()
868
869 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
870 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
871 worker.on('error', error => {
872 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
873 const workerInfo = this.getWorkerInfo(workerNodeKey)
874 workerInfo.ready = false
875 this.workerNodes[workerNodeKey].closeChannel()
876 this.emitter?.emit(PoolEvents.error, error)
877 if (this.opts.restartWorkerOnError === true && !this.starting) {
878 if (workerInfo.dynamic) {
879 this.createAndSetupDynamicWorkerNode()
880 } else {
881 this.createAndSetupWorkerNode()
882 }
883 }
884 if (this.opts.enableTasksQueue === true) {
885 this.redistributeQueuedTasks(workerNodeKey)
886 }
887 })
888 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
889 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
890 worker.once('exit', () => {
891 this.removeWorkerNode(worker)
892 })
893
894 const workerNodeKey = this.addWorkerNode(worker)
895
896 this.afterWorkerNodeSetup(workerNodeKey)
897
898 return workerNodeKey
899 }
900
901 /**
902 * Creates a new, completely set up dynamic worker node.
903 *
904 * @returns New, completely set up dynamic worker node key.
905 */
906 protected createAndSetupDynamicWorkerNode (): number {
907 const workerNodeKey = this.createAndSetupWorkerNode()
908 this.registerWorkerMessageListener(workerNodeKey, message => {
909 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
910 message.workerId
911 )
912 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
913 if (
914 isKillBehavior(KillBehaviors.HARD, message.kill) ||
915 (message.kill != null &&
916 ((this.opts.enableTasksQueue === false &&
917 workerUsage.tasks.executing === 0) ||
918 (this.opts.enableTasksQueue === true &&
919 workerUsage.tasks.executing === 0 &&
920 this.tasksQueueSize(localWorkerNodeKey) === 0)))
921 ) {
922 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
923 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
924 if (isAsyncFunction(destroyWorkerNodeBounded)) {
925 (
926 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
927 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
928 } else {
929 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
930 localWorkerNodeKey
931 )
932 }
933 }
934 })
935 const workerInfo = this.getWorkerInfo(workerNodeKey)
936 this.sendToWorker(workerNodeKey, {
937 checkActive: true,
938 workerId: workerInfo.id as number
939 })
940 workerInfo.dynamic = true
941 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
942 workerInfo.ready = true
943 }
944 return workerNodeKey
945 }
946
947 /**
948 * Registers a listener callback on the worker given its worker node key.
949 *
950 * @param workerNodeKey - The worker node key.
951 * @param listener - The message listener callback.
952 */
953 protected abstract registerWorkerMessageListener<
954 Message extends Data | Response
955 >(
956 workerNodeKey: number,
957 listener: (message: MessageValue<Message>) => void
958 ): void
959
960 /**
961 * Method hooked up after a worker node has been newly created.
962 * Can be overridden.
963 *
964 * @param workerNodeKey - The newly created worker node key.
965 */
966 protected afterWorkerNodeSetup (workerNodeKey: number): void {
967 // Listen to worker messages.
968 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
969 // Send the startup message to worker.
970 this.sendStartupMessageToWorker(workerNodeKey)
971 // Send the worker statistics message to worker.
972 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
973 }
974
975 /**
976 * Sends the startup message to worker given its worker node key.
977 *
978 * @param workerNodeKey - The worker node key.
979 */
980 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
981
982 /**
983 * Sends the worker statistics message to worker given its worker node key.
984 *
985 * @param workerNodeKey - The worker node key.
986 */
987 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
988 this.sendToWorker(workerNodeKey, {
989 statistics: {
990 runTime:
991 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
992 .runTime.aggregate,
993 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
994 .elu.aggregate
995 },
996 workerId: this.getWorkerInfo(workerNodeKey).id as number
997 })
998 }
999
1000 private redistributeQueuedTasks (workerNodeKey: number): void {
1001 while (this.tasksQueueSize(workerNodeKey) > 0) {
1002 let targetWorkerNodeKey: number = workerNodeKey
1003 let minQueuedTasks = Infinity
1004 let executeTask = false
1005 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1006 const workerInfo = this.getWorkerInfo(workerNodeId)
1007 if (
1008 workerNodeId !== workerNodeKey &&
1009 workerInfo.ready &&
1010 workerNode.usage.tasks.queued === 0
1011 ) {
1012 if (
1013 this.workerNodes[workerNodeId].usage.tasks.executing <
1014 (this.opts.tasksQueueOptions?.concurrency as number)
1015 ) {
1016 executeTask = true
1017 }
1018 targetWorkerNodeKey = workerNodeId
1019 break
1020 }
1021 if (
1022 workerNodeId !== workerNodeKey &&
1023 workerInfo.ready &&
1024 workerNode.usage.tasks.queued < minQueuedTasks
1025 ) {
1026 minQueuedTasks = workerNode.usage.tasks.queued
1027 targetWorkerNodeKey = workerNodeId
1028 }
1029 }
1030 if (executeTask) {
1031 this.executeTask(
1032 targetWorkerNodeKey,
1033 this.dequeueTask(workerNodeKey) as Task<Data>
1034 )
1035 } else {
1036 this.enqueueTask(
1037 targetWorkerNodeKey,
1038 this.dequeueTask(workerNodeKey) as Task<Data>
1039 )
1040 }
1041 }
1042 }
1043
1044 /**
1045 * This method is the listener registered for each worker message.
1046 *
1047 * @returns The listener function to execute when a message is received from a worker.
1048 */
1049 protected workerListener (): (message: MessageValue<Response>) => void {
1050 return message => {
1051 this.checkMessageWorkerId(message)
1052 if (message.ready != null) {
1053 // Worker ready response received
1054 this.handleWorkerReadyResponse(message)
1055 } else if (message.id != null) {
1056 // Task execution response received
1057 this.handleTaskExecutionResponse(message)
1058 }
1059 }
1060 }
1061
1062 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1063 this.getWorkerInfo(
1064 this.getWorkerNodeKeyByWorkerId(message.workerId)
1065 ).ready = message.ready as boolean
1066 if (this.emitter != null && this.ready) {
1067 this.emitter.emit(PoolEvents.ready, this.info)
1068 }
1069 }
1070
1071 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1072 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1073 if (promiseResponse != null) {
1074 if (message.taskError != null) {
1075 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1076 promiseResponse.reject(message.taskError.message)
1077 } else {
1078 promiseResponse.resolve(message.data as Response)
1079 }
1080 const workerNodeKey = promiseResponse.workerNodeKey
1081 this.afterTaskExecutionHook(workerNodeKey, message)
1082 this.promiseResponseMap.delete(message.id as string)
1083 if (
1084 this.opts.enableTasksQueue === true &&
1085 this.tasksQueueSize(workerNodeKey) > 0 &&
1086 this.workerNodes[workerNodeKey].usage.tasks.executing <
1087 (this.opts.tasksQueueOptions?.concurrency as number)
1088 ) {
1089 this.executeTask(
1090 workerNodeKey,
1091 this.dequeueTask(workerNodeKey) as Task<Data>
1092 )
1093 }
1094 this.workerChoiceStrategyContext.update(workerNodeKey)
1095 }
1096 }
1097
1098 private checkAndEmitEvents (): void {
1099 if (this.emitter != null) {
1100 if (this.busy) {
1101 this.emitter.emit(PoolEvents.busy, this.info)
1102 }
1103 if (this.type === PoolTypes.dynamic && this.full) {
1104 this.emitter.emit(PoolEvents.full, this.info)
1105 }
1106 }
1107 }
1108
1109 /**
1110 * Gets the worker information given its worker node key.
1111 *
1112 * @param workerNodeKey - The worker node key.
1113 * @returns The worker information.
1114 */
1115 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1116 return this.workerNodes[workerNodeKey].info
1117 }
1118
1119 /**
1120 * Adds the given worker in the pool worker nodes.
1121 *
1122 * @param worker - The worker.
1123 * @returns The added worker node key.
1124 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1125 */
1126 private addWorkerNode (worker: Worker): number {
1127 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1128 // Flag the worker node as ready at pool startup.
1129 if (this.starting) {
1130 workerNode.info.ready = true
1131 }
1132 this.workerNodes.push(workerNode)
1133 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1134 if (workerNodeKey === -1) {
1135 throw new Error('Worker node not found')
1136 }
1137 return workerNodeKey
1138 }
1139
1140 /**
1141 * Removes the given worker from the pool worker nodes.
1142 *
1143 * @param worker - The worker.
1144 */
1145 private removeWorkerNode (worker: Worker): void {
1146 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1147 if (workerNodeKey !== -1) {
1148 this.workerNodes.splice(workerNodeKey, 1)
1149 this.workerChoiceStrategyContext.remove(workerNodeKey)
1150 }
1151 }
1152
1153 /**
1154 * Executes the given task on the worker given its worker node key.
1155 *
1156 * @param workerNodeKey - The worker node key.
1157 * @param task - The task to execute.
1158 */
1159 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1160 this.beforeTaskExecutionHook(workerNodeKey, task)
1161 this.sendToWorker(workerNodeKey, task)
1162 }
1163
1164 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1165 return this.workerNodes[workerNodeKey].enqueueTask(task)
1166 }
1167
1168 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1169 return this.workerNodes[workerNodeKey].dequeueTask()
1170 }
1171
1172 private tasksQueueSize (workerNodeKey: number): number {
1173 return this.workerNodes[workerNodeKey].tasksQueueSize()
1174 }
1175
1176 private flushTasksQueue (workerNodeKey: number): void {
1177 while (this.tasksQueueSize(workerNodeKey) > 0) {
1178 this.executeTask(
1179 workerNodeKey,
1180 this.dequeueTask(workerNodeKey) as Task<Data>
1181 )
1182 }
1183 this.workerNodes[workerNodeKey].clearTasksQueue()
1184 }
1185
1186 private flushTasksQueues (): void {
1187 for (const [workerNodeKey] of this.workerNodes.entries()) {
1188 this.flushTasksQueue(workerNodeKey)
1189 }
1190 }
1191 }