1222d686b847df02f2f9f90adced68d3cebb6b8f
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.dequeueTask = this.dequeueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (!Number.isSafeInteger(max)) {
177 throw new TypeError(
178 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
179 )
180 } else if (min > max) {
181 throw new RangeError(
182 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
183 )
184 } else if (max === 0) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a pool size equal to zero'
187 )
188 } else if (min === max) {
189 throw new RangeError(
190 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
191 )
192 }
193 }
194 }
195
196 private checkPoolOptions (opts: PoolOptions<Worker>): void {
197 if (isPlainObject(opts)) {
198 this.opts.workerChoiceStrategy =
199 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
200 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
201 this.opts.workerChoiceStrategyOptions =
202 opts.workerChoiceStrategyOptions ??
203 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
204 this.checkValidWorkerChoiceStrategyOptions(
205 this.opts.workerChoiceStrategyOptions
206 )
207 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
208 this.opts.enableEvents = opts.enableEvents ?? true
209 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
210 if (this.opts.enableTasksQueue) {
211 this.checkValidTasksQueueOptions(
212 opts.tasksQueueOptions as TasksQueueOptions
213 )
214 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 }
218 } else {
219 throw new TypeError('Invalid pool options: must be a plain object')
220 }
221 }
222
223 private checkValidWorkerChoiceStrategy (
224 workerChoiceStrategy: WorkerChoiceStrategy
225 ): void {
226 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
227 throw new Error(
228 `Invalid worker choice strategy '${workerChoiceStrategy}'`
229 )
230 }
231 }
232
233 private checkValidWorkerChoiceStrategyOptions (
234 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
235 ): void {
236 if (!isPlainObject(workerChoiceStrategyOptions)) {
237 throw new TypeError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
240 }
241 if (
242 workerChoiceStrategyOptions.weights != null &&
243 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
244 ) {
245 throw new Error(
246 'Invalid worker choice strategy options: must have a weight for each worker node'
247 )
248 }
249 if (
250 workerChoiceStrategyOptions.measurement != null &&
251 !Object.values(Measurements).includes(
252 workerChoiceStrategyOptions.measurement
253 )
254 ) {
255 throw new Error(
256 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
257 )
258 }
259 }
260
261 private checkValidTasksQueueOptions (
262 tasksQueueOptions: TasksQueueOptions
263 ): void {
264 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
265 throw new TypeError('Invalid tasks queue options: must be a plain object')
266 }
267 if (
268 tasksQueueOptions?.concurrency != null &&
269 !Number.isSafeInteger(tasksQueueOptions.concurrency)
270 ) {
271 throw new TypeError(
272 'Invalid worker tasks concurrency: must be an integer'
273 )
274 }
275 if (
276 tasksQueueOptions?.concurrency != null &&
277 tasksQueueOptions.concurrency <= 0
278 ) {
279 throw new Error(
280 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
281 )
282 }
283 }
284
285 private startPool (): void {
286 while (
287 this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
290 0
291 ) < this.numberOfWorkers
292 ) {
293 this.createAndSetupWorkerNode()
294 }
295 }
296
297 /** @inheritDoc */
298 public get info (): PoolInfo {
299 return {
300 version,
301 type: this.type,
302 worker: this.worker,
303 ready: this.ready,
304 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
305 minSize: this.minSize,
306 maxSize: this.maxSize,
307 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
308 .runTime.aggregate &&
309 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
310 .waitTime.aggregate && { utilization: round(this.utilization) }),
311 workerNodes: this.workerNodes.length,
312 idleWorkerNodes: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 workerNode.usage.tasks.executing === 0
315 ? accumulator + 1
316 : accumulator,
317 0
318 ),
319 busyWorkerNodes: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
322 0
323 ),
324 executedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.tasks.executed,
327 0
328 ),
329 executingTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + workerNode.usage.tasks.executing,
332 0
333 ),
334 queuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + workerNode.usage.tasks.queued,
337 0
338 ),
339 maxQueuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
342 0
343 ),
344 failedTasks: this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 accumulator + workerNode.usage.tasks.failed,
347 0
348 ),
349 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
350 .runTime.aggregate && {
351 runTime: {
352 minimum: round(
353 Math.min(
354 ...this.workerNodes.map(
355 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
356 )
357 )
358 ),
359 maximum: round(
360 Math.max(
361 ...this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
363 )
364 )
365 ),
366 average: round(
367 this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
370 0
371 ) /
372 this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + (workerNode.usage.tasks?.executed ?? 0),
375 0
376 )
377 ),
378 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
379 .runTime.median && {
380 median: round(
381 median(
382 this.workerNodes.map(
383 workerNode => workerNode.usage.runTime?.median ?? 0
384 )
385 )
386 )
387 })
388 }
389 }),
390 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
391 .waitTime.aggregate && {
392 waitTime: {
393 minimum: round(
394 Math.min(
395 ...this.workerNodes.map(
396 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
397 )
398 )
399 ),
400 maximum: round(
401 Math.max(
402 ...this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
404 )
405 )
406 ),
407 average: round(
408 this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
411 0
412 ) /
413 this.workerNodes.reduce(
414 (accumulator, workerNode) =>
415 accumulator + (workerNode.usage.tasks?.executed ?? 0),
416 0
417 )
418 ),
419 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
420 .waitTime.median && {
421 median: round(
422 median(
423 this.workerNodes.map(
424 workerNode => workerNode.usage.waitTime?.median ?? 0
425 )
426 )
427 )
428 })
429 }
430 })
431 }
432 }
433
434 /**
435 * The pool readiness boolean status.
436 */
437 private get ready (): boolean {
438 return (
439 this.workerNodes.reduce(
440 (accumulator, workerNode) =>
441 !workerNode.info.dynamic && workerNode.info.ready
442 ? accumulator + 1
443 : accumulator,
444 0
445 ) >= this.minSize
446 )
447 }
448
449 /**
450 * The approximate pool utilization.
451 *
452 * @returns The pool utilization.
453 */
454 private get utilization (): number {
455 const poolTimeCapacity =
456 (performance.now() - this.startTimestamp) * this.maxSize
457 const totalTasksRunTime = this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
460 0
461 )
462 const totalTasksWaitTime = this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
465 0
466 )
467 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
468 }
469
470 /**
471 * The pool type.
472 *
473 * If it is `'dynamic'`, it provides the `max` property.
474 */
475 protected abstract get type (): PoolType
476
477 /**
478 * The worker type.
479 */
480 protected abstract get worker (): WorkerType
481
482 /**
483 * The pool minimum size.
484 */
485 protected abstract get minSize (): number
486
487 /**
488 * The pool maximum size.
489 */
490 protected abstract get maxSize (): number
491
492 /**
493 * Checks if the worker id sent in the received message from a worker is valid.
494 *
495 * @param message - The received message.
496 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
497 */
498 private checkMessageWorkerId (message: MessageValue<Response>): void {
499 if (
500 message.workerId != null &&
501 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
502 ) {
503 throw new Error(
504 `Worker message received from unknown worker '${message.workerId}'`
505 )
506 }
507 }
508
509 /**
510 * Gets the given worker its worker node key.
511 *
512 * @param worker - The worker.
513 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
514 */
515 private getWorkerNodeKey (worker: Worker): number {
516 return this.workerNodes.findIndex(
517 workerNode => workerNode.worker === worker
518 )
519 }
520
521 /**
522 * Gets the worker node key given its worker id.
523 *
524 * @param workerId - The worker id.
525 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
526 */
527 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
528 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
529 if (workerNode.info.id === workerId) {
530 return workerNodeKey
531 }
532 }
533 }
534
535 /** @inheritDoc */
536 public setWorkerChoiceStrategy (
537 workerChoiceStrategy: WorkerChoiceStrategy,
538 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
539 ): void {
540 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
541 this.opts.workerChoiceStrategy = workerChoiceStrategy
542 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
543 this.opts.workerChoiceStrategy
544 )
545 if (workerChoiceStrategyOptions != null) {
546 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
547 }
548 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
549 workerNode.resetUsage()
550 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
551 }
552 }
553
554 /** @inheritDoc */
555 public setWorkerChoiceStrategyOptions (
556 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
557 ): void {
558 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
559 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
560 this.workerChoiceStrategyContext.setOptions(
561 this.opts.workerChoiceStrategyOptions
562 )
563 }
564
565 /** @inheritDoc */
566 public enableTasksQueue (
567 enable: boolean,
568 tasksQueueOptions?: TasksQueueOptions
569 ): void {
570 if (this.opts.enableTasksQueue === true && !enable) {
571 this.flushTasksQueues()
572 }
573 this.opts.enableTasksQueue = enable
574 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
575 }
576
577 /** @inheritDoc */
578 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
579 if (this.opts.enableTasksQueue === true) {
580 this.checkValidTasksQueueOptions(tasksQueueOptions)
581 this.opts.tasksQueueOptions =
582 this.buildTasksQueueOptions(tasksQueueOptions)
583 } else if (this.opts.tasksQueueOptions != null) {
584 delete this.opts.tasksQueueOptions
585 }
586 }
587
588 private buildTasksQueueOptions (
589 tasksQueueOptions: TasksQueueOptions
590 ): TasksQueueOptions {
591 return {
592 concurrency: tasksQueueOptions?.concurrency ?? 1
593 }
594 }
595
596 /**
597 * Whether the pool is full or not.
598 *
599 * The pool filling boolean status.
600 */
601 protected get full (): boolean {
602 return this.workerNodes.length >= this.maxSize
603 }
604
605 /**
606 * Whether the pool is busy or not.
607 *
608 * The pool busyness boolean status.
609 */
610 protected abstract get busy (): boolean
611
612 /**
613 * Whether worker nodes are executing at least one task.
614 *
615 * @returns Worker nodes busyness boolean status.
616 */
617 protected internalBusy (): boolean {
618 return (
619 this.workerNodes.findIndex(workerNode => {
620 return workerNode.usage.tasks.executing === 0
621 }) === -1
622 )
623 }
624
625 /** @inheritDoc */
626 public async execute (data?: Data, name?: string): Promise<Response> {
627 return await new Promise<Response>((resolve, reject) => {
628 const timestamp = performance.now()
629 const workerNodeKey = this.chooseWorkerNode()
630 const task: Task<Data> = {
631 name: name ?? DEFAULT_TASK_NAME,
632 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
633 data: data ?? ({} as Data),
634 timestamp,
635 workerId: this.getWorkerInfo(workerNodeKey).id as number,
636 id: randomUUID()
637 }
638 this.promiseResponseMap.set(task.id as string, {
639 resolve,
640 reject,
641 workerNodeKey
642 })
643 if (
644 this.opts.enableTasksQueue === true &&
645 (this.busy ||
646 this.workerNodes[workerNodeKey].usage.tasks.executing >=
647 ((this.opts.tasksQueueOptions as TasksQueueOptions)
648 .concurrency as number))
649 ) {
650 this.enqueueTask(workerNodeKey, task)
651 } else {
652 this.executeTask(workerNodeKey, task)
653 }
654 this.checkAndEmitEvents()
655 })
656 }
657
658 /** @inheritDoc */
659 public async destroy (): Promise<void> {
660 await Promise.all(
661 this.workerNodes.map(async (workerNode, workerNodeKey) => {
662 this.flushTasksQueue(workerNodeKey)
663 // FIXME: wait for tasks to be finished
664 const workerExitPromise = new Promise<void>(resolve => {
665 workerNode.worker.on('exit', () => {
666 resolve()
667 })
668 })
669 await this.destroyWorkerNode(workerNodeKey)
670 await workerExitPromise
671 })
672 )
673 }
674
675 /**
676 * Terminates the worker node given its worker node key.
677 *
678 * @param workerNodeKey - The worker node key.
679 */
680 protected abstract destroyWorkerNode (
681 workerNodeKey: number
682 ): void | Promise<void>
683
684 /**
685 * Setup hook to execute code before worker nodes are created in the abstract constructor.
686 * Can be overridden.
687 *
688 * @virtual
689 */
690 protected setupHook (): void {
691 // Intentionally empty
692 }
693
694 /**
695 * Should return whether the worker is the main worker or not.
696 */
697 protected abstract isMain (): boolean
698
699 /**
700 * Hook executed before the worker task execution.
701 * Can be overridden.
702 *
703 * @param workerNodeKey - The worker node key.
704 * @param task - The task to execute.
705 */
706 protected beforeTaskExecutionHook (
707 workerNodeKey: number,
708 task: Task<Data>
709 ): void {
710 const workerUsage = this.workerNodes[workerNodeKey].usage
711 ++workerUsage.tasks.executing
712 this.updateWaitTimeWorkerUsage(workerUsage, task)
713 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
714 task.name as string
715 ) as WorkerUsage
716 ++taskWorkerUsage.tasks.executing
717 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
718 }
719
720 /**
721 * Hook executed after the worker task execution.
722 * Can be overridden.
723 *
724 * @param workerNodeKey - The worker node key.
725 * @param message - The received message.
726 */
727 protected afterTaskExecutionHook (
728 workerNodeKey: number,
729 message: MessageValue<Response>
730 ): void {
731 const workerUsage = this.workerNodes[workerNodeKey].usage
732 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
733 this.updateRunTimeWorkerUsage(workerUsage, message)
734 this.updateEluWorkerUsage(workerUsage, message)
735 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
736 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
737 ) as WorkerUsage
738 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
739 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
740 this.updateEluWorkerUsage(taskWorkerUsage, message)
741 }
742
743 private updateTaskStatisticsWorkerUsage (
744 workerUsage: WorkerUsage,
745 message: MessageValue<Response>
746 ): void {
747 const workerTaskStatistics = workerUsage.tasks
748 --workerTaskStatistics.executing
749 if (message.taskError == null) {
750 ++workerTaskStatistics.executed
751 } else {
752 ++workerTaskStatistics.failed
753 }
754 }
755
756 private updateRunTimeWorkerUsage (
757 workerUsage: WorkerUsage,
758 message: MessageValue<Response>
759 ): void {
760 updateMeasurementStatistics(
761 workerUsage.runTime,
762 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
763 message.taskPerformance?.runTime ?? 0,
764 workerUsage.tasks.executed
765 )
766 }
767
768 private updateWaitTimeWorkerUsage (
769 workerUsage: WorkerUsage,
770 task: Task<Data>
771 ): void {
772 const timestamp = performance.now()
773 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
774 updateMeasurementStatistics(
775 workerUsage.waitTime,
776 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
777 taskWaitTime,
778 workerUsage.tasks.executed
779 )
780 }
781
782 private updateEluWorkerUsage (
783 workerUsage: WorkerUsage,
784 message: MessageValue<Response>
785 ): void {
786 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
788 updateMeasurementStatistics(
789 workerUsage.elu.active,
790 eluTaskStatisticsRequirements,
791 message.taskPerformance?.elu?.active ?? 0,
792 workerUsage.tasks.executed
793 )
794 updateMeasurementStatistics(
795 workerUsage.elu.idle,
796 eluTaskStatisticsRequirements,
797 message.taskPerformance?.elu?.idle ?? 0,
798 workerUsage.tasks.executed
799 )
800 if (eluTaskStatisticsRequirements.aggregate) {
801 if (message.taskPerformance?.elu != null) {
802 if (workerUsage.elu.utilization != null) {
803 workerUsage.elu.utilization =
804 (workerUsage.elu.utilization +
805 message.taskPerformance.elu.utilization) /
806 2
807 } else {
808 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
809 }
810 }
811 }
812 }
813
814 /**
815 * Chooses a worker node for the next task.
816 *
817 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
818 *
819 * @returns The chosen worker node key
820 */
821 private chooseWorkerNode (): number {
822 if (this.shallCreateDynamicWorker()) {
823 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
824 if (
825 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
826 ) {
827 return workerNodeKey
828 }
829 }
830 return this.workerChoiceStrategyContext.execute()
831 }
832
833 /**
834 * Conditions for dynamic worker creation.
835 *
836 * @returns Whether to create a dynamic worker or not.
837 */
838 private shallCreateDynamicWorker (): boolean {
839 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
840 }
841
842 /**
843 * Sends a message to worker given its worker node key.
844 *
845 * @param workerNodeKey - The worker node key.
846 * @param message - The message.
847 */
848 protected abstract sendToWorker (
849 workerNodeKey: number,
850 message: MessageValue<Data>
851 ): void
852
853 /**
854 * Creates a new worker.
855 *
856 * @returns Newly created worker.
857 */
858 protected abstract createWorker (): Worker
859
860 /**
861 * Creates a new, completely set up worker node.
862 *
863 * @returns New, completely set up worker node key.
864 */
865 protected createAndSetupWorkerNode (): number {
866 const worker = this.createWorker()
867
868 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
869 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
870 worker.on('error', error => {
871 const workerNodeKey = this.getWorkerNodeKey(worker)
872 const workerInfo = this.getWorkerInfo(workerNodeKey)
873 workerInfo.ready = false
874 this.workerNodes[workerNodeKey].closeChannel()
875 this.emitter?.emit(PoolEvents.error, error)
876 if (this.opts.restartWorkerOnError === true && !this.starting) {
877 if (workerInfo.dynamic) {
878 this.createAndSetupDynamicWorkerNode()
879 } else {
880 this.createAndSetupWorkerNode()
881 }
882 }
883 if (this.opts.enableTasksQueue === true) {
884 this.redistributeQueuedTasks(workerNodeKey)
885 }
886 })
887 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
888 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
889 worker.once('exit', () => {
890 this.removeWorkerNode(worker)
891 })
892
893 const workerNodeKey = this.addWorkerNode(worker)
894
895 this.afterWorkerNodeSetup(workerNodeKey)
896
897 return workerNodeKey
898 }
899
900 /**
901 * Creates a new, completely set up dynamic worker node.
902 *
903 * @returns New, completely set up dynamic worker node key.
904 */
905 protected createAndSetupDynamicWorkerNode (): number {
906 const workerNodeKey = this.createAndSetupWorkerNode()
907 this.registerWorkerMessageListener(workerNodeKey, message => {
908 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
909 message.workerId
910 ) as number
911 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
912 if (
913 isKillBehavior(KillBehaviors.HARD, message.kill) ||
914 (message.kill != null &&
915 ((this.opts.enableTasksQueue === false &&
916 workerUsage.tasks.executing === 0) ||
917 (this.opts.enableTasksQueue === true &&
918 workerUsage.tasks.executing === 0 &&
919 this.tasksQueueSize(localWorkerNodeKey) === 0)))
920 ) {
921 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
922 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
923 if (isAsyncFunction(destroyWorkerNodeBounded)) {
924 (
925 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
926 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
927 } else {
928 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
929 localWorkerNodeKey
930 )
931 }
932 }
933 })
934 const workerInfo = this.getWorkerInfo(workerNodeKey)
935 workerInfo.dynamic = true
936 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
937 workerInfo.ready = true
938 }
939 this.sendToWorker(workerNodeKey, {
940 checkActive: true,
941 workerId: workerInfo.id as number
942 })
943 return workerNodeKey
944 }
945
946 /**
947 * Registers a listener callback on the worker given its worker node key.
948 *
949 * @param workerNodeKey - The worker node key.
950 * @param listener - The message listener callback.
951 */
952 protected abstract registerWorkerMessageListener<
953 Message extends Data | Response
954 >(
955 workerNodeKey: number,
956 listener: (message: MessageValue<Message>) => void
957 ): void
958
959 /**
960 * Method hooked up after a worker node has been newly created.
961 * Can be overridden.
962 *
963 * @param workerNodeKey - The newly created worker node key.
964 */
965 protected afterWorkerNodeSetup (workerNodeKey: number): void {
966 // Listen to worker messages.
967 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
968 // Send the startup message to worker.
969 this.sendStartupMessageToWorker(workerNodeKey)
970 // Send the worker statistics message to worker.
971 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
972 }
973
974 /**
975 * Sends the startup message to worker given its worker node key.
976 *
977 * @param workerNodeKey - The worker node key.
978 */
979 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
980
981 /**
982 * Sends the worker statistics message to worker given its worker node key.
983 *
984 * @param workerNodeKey - The worker node key.
985 */
986 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
987 this.sendToWorker(workerNodeKey, {
988 statistics: {
989 runTime:
990 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
991 .runTime.aggregate,
992 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
993 .elu.aggregate
994 },
995 workerId: this.getWorkerInfo(workerNodeKey).id as number
996 })
997 }
998
999 private redistributeQueuedTasks (workerNodeKey: number): void {
1000 while (this.tasksQueueSize(workerNodeKey) > 0) {
1001 let targetWorkerNodeKey: number = workerNodeKey
1002 let minQueuedTasks = Infinity
1003 let executeTask = false
1004 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1005 const workerInfo = this.getWorkerInfo(workerNodeId)
1006 if (
1007 workerNodeId !== workerNodeKey &&
1008 workerInfo.ready &&
1009 workerNode.usage.tasks.queued === 0
1010 ) {
1011 if (workerNode.usage.tasks.executing === 0) {
1012 executeTask = true
1013 }
1014 targetWorkerNodeKey = workerNodeId
1015 break
1016 }
1017 if (
1018 workerNodeId !== workerNodeKey &&
1019 workerInfo.ready &&
1020 workerNode.usage.tasks.queued < minQueuedTasks
1021 ) {
1022 minQueuedTasks = workerNode.usage.tasks.queued
1023 targetWorkerNodeKey = workerNodeId
1024 }
1025 }
1026 if (executeTask) {
1027 this.executeTask(
1028 targetWorkerNodeKey,
1029 this.dequeueTask(workerNodeKey) as Task<Data>
1030 )
1031 } else {
1032 this.enqueueTask(
1033 targetWorkerNodeKey,
1034 this.dequeueTask(workerNodeKey) as Task<Data>
1035 )
1036 }
1037 }
1038 }
1039
1040 /**
1041 * This method is the listener registered for each worker message.
1042 *
1043 * @returns The listener function to execute when a message is received from a worker.
1044 */
1045 protected workerListener (): (message: MessageValue<Response>) => void {
1046 return message => {
1047 this.checkMessageWorkerId(message)
1048 if (message.ready != null) {
1049 // Worker ready response received
1050 this.handleWorkerReadyResponse(message)
1051 } else if (message.id != null) {
1052 // Task execution response received
1053 this.handleTaskExecutionResponse(message)
1054 }
1055 }
1056 }
1057
1058 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1059 this.getWorkerInfo(
1060 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
1061 ).ready = message.ready as boolean
1062 if (this.emitter != null && this.ready) {
1063 this.emitter.emit(PoolEvents.ready, this.info)
1064 }
1065 }
1066
1067 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1068 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1069 if (promiseResponse != null) {
1070 if (message.taskError != null) {
1071 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1072 promiseResponse.reject(message.taskError.message)
1073 } else {
1074 promiseResponse.resolve(message.data as Response)
1075 }
1076 const workerNodeKey = promiseResponse.workerNodeKey
1077 this.afterTaskExecutionHook(workerNodeKey, message)
1078 this.promiseResponseMap.delete(message.id as string)
1079 if (
1080 this.opts.enableTasksQueue === true &&
1081 this.tasksQueueSize(workerNodeKey) > 0
1082 ) {
1083 this.executeTask(
1084 workerNodeKey,
1085 this.dequeueTask(workerNodeKey) as Task<Data>
1086 )
1087 }
1088 this.workerChoiceStrategyContext.update(workerNodeKey)
1089 }
1090 }
1091
1092 private checkAndEmitEvents (): void {
1093 if (this.emitter != null) {
1094 if (this.busy) {
1095 this.emitter.emit(PoolEvents.busy, this.info)
1096 }
1097 if (this.type === PoolTypes.dynamic && this.full) {
1098 this.emitter.emit(PoolEvents.full, this.info)
1099 }
1100 }
1101 }
1102
1103 /**
1104 * Gets the worker information given its worker node key.
1105 *
1106 * @param workerNodeKey - The worker node key.
1107 * @returns The worker information.
1108 */
1109 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1110 return this.workerNodes[workerNodeKey].info
1111 }
1112
1113 /**
1114 * Adds the given worker in the pool worker nodes.
1115 *
1116 * @param worker - The worker.
1117 * @returns The added worker node key.
1118 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1119 */
1120 private addWorkerNode (worker: Worker): number {
1121 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1122 // Flag the worker node as ready at pool startup.
1123 if (this.starting) {
1124 workerNode.info.ready = true
1125 }
1126 this.workerNodes.push(workerNode)
1127 const workerNodeKey = this.getWorkerNodeKey(worker)
1128 if (workerNodeKey === -1) {
1129 throw new Error('Worker node not found')
1130 }
1131 return workerNodeKey
1132 }
1133
1134 /**
1135 * Removes the given worker from the pool worker nodes.
1136 *
1137 * @param worker - The worker.
1138 */
1139 private removeWorkerNode (worker: Worker): void {
1140 const workerNodeKey = this.getWorkerNodeKey(worker)
1141 if (workerNodeKey !== -1) {
1142 this.workerNodes.splice(workerNodeKey, 1)
1143 this.workerChoiceStrategyContext.remove(workerNodeKey)
1144 }
1145 }
1146
1147 /**
1148 * Executes the given task on the worker given its worker node key.
1149 *
1150 * @param workerNodeKey - The worker node key.
1151 * @param task - The task to execute.
1152 */
1153 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1154 this.beforeTaskExecutionHook(workerNodeKey, task)
1155 this.sendToWorker(workerNodeKey, task)
1156 }
1157
1158 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1159 return this.workerNodes[workerNodeKey].enqueueTask(task)
1160 }
1161
1162 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1163 return this.workerNodes[workerNodeKey].dequeueTask()
1164 }
1165
1166 private tasksQueueSize (workerNodeKey: number): number {
1167 return this.workerNodes[workerNodeKey].tasksQueueSize()
1168 }
1169
1170 private flushTasksQueue (workerNodeKey: number): void {
1171 while (this.tasksQueueSize(workerNodeKey) > 0) {
1172 this.executeTask(
1173 workerNodeKey,
1174 this.dequeueTask(workerNodeKey) as Task<Data>
1175 )
1176 }
1177 this.workerNodes[workerNodeKey].clearTasksQueue()
1178 }
1179
1180 private flushTasksQueues (): void {
1181 for (const [workerNodeKey] of this.workerNodes.entries()) {
1182 this.flushTasksQueue(workerNodeKey)
1183 }
1184 }
1185 }