dde31ea49f20d7ef4a14015c3faec458683b9c2a
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isKillBehavior,
15 isPlainObject,
16 median,
17 round,
18 updateMeasurementStatistics
19 } from '../utils'
20 import { KillBehaviors } from '../worker/worker-options'
21 import {
22 type IPool,
23 PoolEmitter,
24 PoolEvents,
25 type PoolInfo,
26 type PoolOptions,
27 type PoolType,
28 PoolTypes,
29 type TasksQueueOptions
30 } from './pool'
31 import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The task execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.dequeueTask = this.dequeueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (max == null) {
177 throw new Error(
178 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
179 )
180 } else if (!Number.isSafeInteger(max)) {
181 throw new TypeError(
182 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 )
184 } else if (min > max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
187 )
188 } else if (max === 0) {
189 throw new RangeError(
190 'Cannot instantiate a dynamic pool with a pool size equal to zero'
191 )
192 } else if (min === max) {
193 throw new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 }
197 }
198 }
199
200 private checkPoolOptions (opts: PoolOptions<Worker>): void {
201 if (isPlainObject(opts)) {
202 this.opts.workerChoiceStrategy =
203 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
204 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
205 this.opts.workerChoiceStrategyOptions =
206 opts.workerChoiceStrategyOptions ??
207 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
208 this.checkValidWorkerChoiceStrategyOptions(
209 this.opts.workerChoiceStrategyOptions
210 )
211 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
212 this.opts.enableEvents = opts.enableEvents ?? true
213 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
214 if (this.opts.enableTasksQueue) {
215 this.checkValidTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 }
222 } else {
223 throw new TypeError('Invalid pool options: must be a plain object')
224 }
225 }
226
227 private checkValidWorkerChoiceStrategy (
228 workerChoiceStrategy: WorkerChoiceStrategy
229 ): void {
230 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
231 throw new Error(
232 `Invalid worker choice strategy '${workerChoiceStrategy}'`
233 )
234 }
235 }
236
237 private checkValidWorkerChoiceStrategyOptions (
238 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
239 ): void {
240 if (!isPlainObject(workerChoiceStrategyOptions)) {
241 throw new TypeError(
242 'Invalid worker choice strategy options: must be a plain object'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.weights != null &&
247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
253 if (
254 workerChoiceStrategyOptions.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
263 }
264
265 private checkValidTasksQueueOptions (
266 tasksQueueOptions: TasksQueueOptions
267 ): void {
268 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
269 throw new TypeError('Invalid tasks queue options: must be a plain object')
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 !Number.isSafeInteger(tasksQueueOptions.concurrency)
274 ) {
275 throw new TypeError(
276 'Invalid worker tasks concurrency: must be an integer'
277 )
278 }
279 if (
280 tasksQueueOptions?.concurrency != null &&
281 tasksQueueOptions.concurrency <= 0
282 ) {
283 throw new Error(
284 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
285 )
286 }
287 }
288
289 private startPool (): void {
290 while (
291 this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
294 0
295 ) < this.numberOfWorkers
296 ) {
297 this.createAndSetupWorkerNode()
298 }
299 }
300
301 /** @inheritDoc */
302 public get info (): PoolInfo {
303 return {
304 version,
305 type: this.type,
306 worker: this.worker,
307 ready: this.ready,
308 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
309 minSize: this.minSize,
310 maxSize: this.maxSize,
311 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
312 .runTime.aggregate &&
313 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .waitTime.aggregate && { utilization: round(this.utilization) }),
315 workerNodes: this.workerNodes.length,
316 idleWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 workerNode.usage.tasks.executing === 0
319 ? accumulator + 1
320 : accumulator,
321 0
322 ),
323 busyWorkerNodes: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
326 0
327 ),
328 executedTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.executed,
331 0
332 ),
333 executingTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.executing,
336 0
337 ),
338 queuedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.queued,
341 0
342 ),
343 maxQueuedTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
346 0
347 ),
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + workerNode.usage.tasks.failed,
351 0
352 ),
353 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 .runTime.aggregate && {
355 runTime: {
356 minimum: round(
357 Math.min(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
360 )
361 )
362 ),
363 maximum: round(
364 Math.max(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
367 )
368 )
369 ),
370 average: round(
371 this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
374 0
375 ) /
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.tasks?.executed ?? 0),
379 0
380 )
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.median && {
384 median: round(
385 median(
386 this.workerNodes.map(
387 workerNode => workerNode.usage.runTime?.median ?? 0
388 )
389 )
390 )
391 })
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
397 minimum: round(
398 Math.min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
402 )
403 ),
404 maximum: round(
405 Math.max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
409 )
410 ),
411 average: round(
412 this.workerNodes.reduce(
413 (accumulator, workerNode) =>
414 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
415 0
416 ) /
417 this.workerNodes.reduce(
418 (accumulator, workerNode) =>
419 accumulator + (workerNode.usage.tasks?.executed ?? 0),
420 0
421 )
422 ),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.map(
428 workerNode => workerNode.usage.waitTime?.median ?? 0
429 )
430 )
431 )
432 })
433 }
434 })
435 }
436 }
437
438 /**
439 * The pool readiness boolean status.
440 */
441 private get ready (): boolean {
442 return (
443 this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 !workerNode.info.dynamic && workerNode.info.ready
446 ? accumulator + 1
447 : accumulator,
448 0
449 ) >= this.minSize
450 )
451 }
452
453 /**
454 * The approximate pool utilization.
455 *
456 * @returns The pool utilization.
457 */
458 private get utilization (): number {
459 const poolTimeCapacity =
460 (performance.now() - this.startTimestamp) * this.maxSize
461 const totalTasksRunTime = this.workerNodes.reduce(
462 (accumulator, workerNode) =>
463 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
464 0
465 )
466 const totalTasksWaitTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
469 0
470 )
471 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
472 }
473
474 /**
475 * The pool type.
476 *
477 * If it is `'dynamic'`, it provides the `max` property.
478 */
479 protected abstract get type (): PoolType
480
481 /**
482 * The worker type.
483 */
484 protected abstract get worker (): WorkerType
485
486 /**
487 * The pool minimum size.
488 */
489 protected abstract get minSize (): number
490
491 /**
492 * The pool maximum size.
493 */
494 protected abstract get maxSize (): number
495
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
513 /**
514 * Gets the given worker its worker node key.
515 *
516 * @param worker - The worker.
517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
523 }
524
525 /**
526 * Gets the worker node key given its worker id.
527 *
528 * @param workerId - The worker id.
529 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
530 */
531 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
532 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
533 if (workerNode.info.id === workerId) {
534 return workerNodeKey
535 }
536 }
537 }
538
539 /** @inheritDoc */
540 public setWorkerChoiceStrategy (
541 workerChoiceStrategy: WorkerChoiceStrategy,
542 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
543 ): void {
544 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
545 this.opts.workerChoiceStrategy = workerChoiceStrategy
546 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
547 this.opts.workerChoiceStrategy
548 )
549 if (workerChoiceStrategyOptions != null) {
550 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
551 }
552 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
553 workerNode.resetUsage()
554 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
555 }
556 }
557
558 /** @inheritDoc */
559 public setWorkerChoiceStrategyOptions (
560 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
561 ): void {
562 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
563 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
564 this.workerChoiceStrategyContext.setOptions(
565 this.opts.workerChoiceStrategyOptions
566 )
567 }
568
569 /** @inheritDoc */
570 public enableTasksQueue (
571 enable: boolean,
572 tasksQueueOptions?: TasksQueueOptions
573 ): void {
574 if (this.opts.enableTasksQueue === true && !enable) {
575 this.flushTasksQueues()
576 }
577 this.opts.enableTasksQueue = enable
578 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
579 }
580
581 /** @inheritDoc */
582 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
583 if (this.opts.enableTasksQueue === true) {
584 this.checkValidTasksQueueOptions(tasksQueueOptions)
585 this.opts.tasksQueueOptions =
586 this.buildTasksQueueOptions(tasksQueueOptions)
587 } else if (this.opts.tasksQueueOptions != null) {
588 delete this.opts.tasksQueueOptions
589 }
590 }
591
592 private buildTasksQueueOptions (
593 tasksQueueOptions: TasksQueueOptions
594 ): TasksQueueOptions {
595 return {
596 concurrency: tasksQueueOptions?.concurrency ?? 1
597 }
598 }
599
600 /**
601 * Whether the pool is full or not.
602 *
603 * The pool filling boolean status.
604 */
605 protected get full (): boolean {
606 return this.workerNodes.length >= this.maxSize
607 }
608
609 /**
610 * Whether the pool is busy or not.
611 *
612 * The pool busyness boolean status.
613 */
614 protected abstract get busy (): boolean
615
616 /**
617 * Whether worker nodes are executing at least one task.
618 *
619 * @returns Worker nodes busyness boolean status.
620 */
621 protected internalBusy (): boolean {
622 return (
623 this.workerNodes.findIndex(workerNode => {
624 return workerNode.usage.tasks.executing === 0
625 }) === -1
626 )
627 }
628
629 /** @inheritDoc */
630 public async execute (data?: Data, name?: string): Promise<Response> {
631 return await new Promise<Response>((resolve, reject) => {
632 const timestamp = performance.now()
633 const workerNodeKey = this.chooseWorkerNode()
634 const task: Task<Data> = {
635 name: name ?? DEFAULT_TASK_NAME,
636 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
637 data: data ?? ({} as Data),
638 timestamp,
639 workerId: this.getWorkerInfo(workerNodeKey).id as number,
640 id: randomUUID()
641 }
642 this.promiseResponseMap.set(task.id as string, {
643 resolve,
644 reject,
645 workerNodeKey
646 })
647 if (
648 this.opts.enableTasksQueue === true &&
649 (this.busy ||
650 this.workerNodes[workerNodeKey].usage.tasks.executing >=
651 (this.opts.tasksQueueOptions?.concurrency as number))
652 ) {
653 this.enqueueTask(workerNodeKey, task)
654 } else {
655 this.executeTask(workerNodeKey, task)
656 }
657 this.checkAndEmitEvents()
658 })
659 }
660
661 /** @inheritDoc */
662 public async destroy (): Promise<void> {
663 await Promise.all(
664 this.workerNodes.map(async (workerNode, workerNodeKey) => {
665 this.flushTasksQueue(workerNodeKey)
666 // FIXME: wait for tasks to be finished
667 const workerExitPromise = new Promise<void>(resolve => {
668 workerNode.worker.on('exit', () => {
669 resolve()
670 })
671 })
672 await this.destroyWorkerNode(workerNodeKey)
673 await workerExitPromise
674 })
675 )
676 }
677
678 /**
679 * Terminates the worker node given its worker node key.
680 *
681 * @param workerNodeKey - The worker node key.
682 */
683 protected abstract destroyWorkerNode (
684 workerNodeKey: number
685 ): void | Promise<void>
686
687 /**
688 * Setup hook to execute code before worker nodes are created in the abstract constructor.
689 * Can be overridden.
690 *
691 * @virtual
692 */
693 protected setupHook (): void {
694 // Intentionally empty
695 }
696
697 /**
698 * Should return whether the worker is the main worker or not.
699 */
700 protected abstract isMain (): boolean
701
702 /**
703 * Hook executed before the worker task execution.
704 * Can be overridden.
705 *
706 * @param workerNodeKey - The worker node key.
707 * @param task - The task to execute.
708 */
709 protected beforeTaskExecutionHook (
710 workerNodeKey: number,
711 task: Task<Data>
712 ): void {
713 const workerUsage = this.workerNodes[workerNodeKey].usage
714 ++workerUsage.tasks.executing
715 this.updateWaitTimeWorkerUsage(workerUsage, task)
716 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
717 task.name as string
718 ) as WorkerUsage
719 ++taskWorkerUsage.tasks.executing
720 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
721 }
722
723 /**
724 * Hook executed after the worker task execution.
725 * Can be overridden.
726 *
727 * @param workerNodeKey - The worker node key.
728 * @param message - The received message.
729 */
730 protected afterTaskExecutionHook (
731 workerNodeKey: number,
732 message: MessageValue<Response>
733 ): void {
734 const workerUsage = this.workerNodes[workerNodeKey].usage
735 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
736 this.updateRunTimeWorkerUsage(workerUsage, message)
737 this.updateEluWorkerUsage(workerUsage, message)
738 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
739 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
740 ) as WorkerUsage
741 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
742 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
743 this.updateEluWorkerUsage(taskWorkerUsage, message)
744 }
745
746 private updateTaskStatisticsWorkerUsage (
747 workerUsage: WorkerUsage,
748 message: MessageValue<Response>
749 ): void {
750 const workerTaskStatistics = workerUsage.tasks
751 --workerTaskStatistics.executing
752 if (message.taskError == null) {
753 ++workerTaskStatistics.executed
754 } else {
755 ++workerTaskStatistics.failed
756 }
757 }
758
759 private updateRunTimeWorkerUsage (
760 workerUsage: WorkerUsage,
761 message: MessageValue<Response>
762 ): void {
763 updateMeasurementStatistics(
764 workerUsage.runTime,
765 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
766 message.taskPerformance?.runTime ?? 0,
767 workerUsage.tasks.executed
768 )
769 }
770
771 private updateWaitTimeWorkerUsage (
772 workerUsage: WorkerUsage,
773 task: Task<Data>
774 ): void {
775 const timestamp = performance.now()
776 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
777 updateMeasurementStatistics(
778 workerUsage.waitTime,
779 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
780 taskWaitTime,
781 workerUsage.tasks.executed
782 )
783 }
784
785 private updateEluWorkerUsage (
786 workerUsage: WorkerUsage,
787 message: MessageValue<Response>
788 ): void {
789 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
790 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
791 updateMeasurementStatistics(
792 workerUsage.elu.active,
793 eluTaskStatisticsRequirements,
794 message.taskPerformance?.elu?.active ?? 0,
795 workerUsage.tasks.executed
796 )
797 updateMeasurementStatistics(
798 workerUsage.elu.idle,
799 eluTaskStatisticsRequirements,
800 message.taskPerformance?.elu?.idle ?? 0,
801 workerUsage.tasks.executed
802 )
803 if (eluTaskStatisticsRequirements.aggregate) {
804 if (message.taskPerformance?.elu != null) {
805 if (workerUsage.elu.utilization != null) {
806 workerUsage.elu.utilization =
807 (workerUsage.elu.utilization +
808 message.taskPerformance.elu.utilization) /
809 2
810 } else {
811 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
812 }
813 }
814 }
815 }
816
817 /**
818 * Chooses a worker node for the next task.
819 *
820 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
821 *
822 * @returns The chosen worker node key
823 */
824 private chooseWorkerNode (): number {
825 if (this.shallCreateDynamicWorker()) {
826 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
827 if (
828 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
829 ) {
830 return workerNodeKey
831 }
832 }
833 return this.workerChoiceStrategyContext.execute()
834 }
835
836 /**
837 * Conditions for dynamic worker creation.
838 *
839 * @returns Whether to create a dynamic worker or not.
840 */
841 private shallCreateDynamicWorker (): boolean {
842 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
843 }
844
845 /**
846 * Sends a message to worker given its worker node key.
847 *
848 * @param workerNodeKey - The worker node key.
849 * @param message - The message.
850 */
851 protected abstract sendToWorker (
852 workerNodeKey: number,
853 message: MessageValue<Data>
854 ): void
855
856 /**
857 * Creates a new worker.
858 *
859 * @returns Newly created worker.
860 */
861 protected abstract createWorker (): Worker
862
863 /**
864 * Creates a new, completely set up worker node.
865 *
866 * @returns New, completely set up worker node key.
867 */
868 protected createAndSetupWorkerNode (): number {
869 const worker = this.createWorker()
870
871 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
872 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
873 worker.on('error', error => {
874 const workerNodeKey = this.getWorkerNodeKey(worker)
875 const workerInfo = this.getWorkerInfo(workerNodeKey)
876 workerInfo.ready = false
877 this.workerNodes[workerNodeKey].closeChannel()
878 this.emitter?.emit(PoolEvents.error, error)
879 if (this.opts.restartWorkerOnError === true && !this.starting) {
880 if (workerInfo.dynamic) {
881 this.createAndSetupDynamicWorkerNode()
882 } else {
883 this.createAndSetupWorkerNode()
884 }
885 }
886 if (this.opts.enableTasksQueue === true) {
887 this.redistributeQueuedTasks(workerNodeKey)
888 }
889 })
890 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
891 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
892 worker.once('exit', () => {
893 this.removeWorkerNode(worker)
894 })
895
896 const workerNodeKey = this.addWorkerNode(worker)
897
898 this.afterWorkerNodeSetup(workerNodeKey)
899
900 return workerNodeKey
901 }
902
903 /**
904 * Creates a new, completely set up dynamic worker node.
905 *
906 * @returns New, completely set up dynamic worker node key.
907 */
908 protected createAndSetupDynamicWorkerNode (): number {
909 const workerNodeKey = this.createAndSetupWorkerNode()
910 this.registerWorkerMessageListener(workerNodeKey, message => {
911 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
912 message.workerId
913 ) as number
914 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
915 if (
916 isKillBehavior(KillBehaviors.HARD, message.kill) ||
917 (message.kill != null &&
918 ((this.opts.enableTasksQueue === false &&
919 workerUsage.tasks.executing === 0) ||
920 (this.opts.enableTasksQueue === true &&
921 workerUsage.tasks.executing === 0 &&
922 this.tasksQueueSize(localWorkerNodeKey) === 0)))
923 ) {
924 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
925 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
926 if (isAsyncFunction(destroyWorkerNodeBounded)) {
927 (
928 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
929 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930 } else {
931 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
932 localWorkerNodeKey
933 )
934 }
935 }
936 })
937 const workerInfo = this.getWorkerInfo(workerNodeKey)
938 this.sendToWorker(workerNodeKey, {
939 checkActive: true,
940 workerId: workerInfo.id as number
941 })
942 workerInfo.dynamic = true
943 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
944 workerInfo.ready = true
945 }
946 return workerNodeKey
947 }
948
949 /**
950 * Registers a listener callback on the worker given its worker node key.
951 *
952 * @param workerNodeKey - The worker node key.
953 * @param listener - The message listener callback.
954 */
955 protected abstract registerWorkerMessageListener<
956 Message extends Data | Response
957 >(
958 workerNodeKey: number,
959 listener: (message: MessageValue<Message>) => void
960 ): void
961
962 /**
963 * Method hooked up after a worker node has been newly created.
964 * Can be overridden.
965 *
966 * @param workerNodeKey - The newly created worker node key.
967 */
968 protected afterWorkerNodeSetup (workerNodeKey: number): void {
969 // Listen to worker messages.
970 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
971 // Send the startup message to worker.
972 this.sendStartupMessageToWorker(workerNodeKey)
973 // Send the worker statistics message to worker.
974 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
975 }
976
977 /**
978 * Sends the startup message to worker given its worker node key.
979 *
980 * @param workerNodeKey - The worker node key.
981 */
982 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
983
984 /**
985 * Sends the worker statistics message to worker given its worker node key.
986 *
987 * @param workerNodeKey - The worker node key.
988 */
989 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
990 this.sendToWorker(workerNodeKey, {
991 statistics: {
992 runTime:
993 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
994 .runTime.aggregate,
995 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
996 .elu.aggregate
997 },
998 workerId: this.getWorkerInfo(workerNodeKey).id as number
999 })
1000 }
1001
1002 private redistributeQueuedTasks (workerNodeKey: number): void {
1003 while (this.tasksQueueSize(workerNodeKey) > 0) {
1004 let targetWorkerNodeKey: number = workerNodeKey
1005 let minQueuedTasks = Infinity
1006 let executeTask = false
1007 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1008 const workerInfo = this.getWorkerInfo(workerNodeId)
1009 if (
1010 workerNodeId !== workerNodeKey &&
1011 workerInfo.ready &&
1012 workerNode.usage.tasks.queued === 0
1013 ) {
1014 if (
1015 this.workerNodes[workerNodeId].usage.tasks.executing <
1016 (this.opts.tasksQueueOptions?.concurrency as number)
1017 ) {
1018 executeTask = true
1019 }
1020 targetWorkerNodeKey = workerNodeId
1021 break
1022 }
1023 if (
1024 workerNodeId !== workerNodeKey &&
1025 workerInfo.ready &&
1026 workerNode.usage.tasks.queued < minQueuedTasks
1027 ) {
1028 minQueuedTasks = workerNode.usage.tasks.queued
1029 targetWorkerNodeKey = workerNodeId
1030 }
1031 }
1032 if (executeTask) {
1033 this.executeTask(
1034 targetWorkerNodeKey,
1035 this.dequeueTask(workerNodeKey) as Task<Data>
1036 )
1037 } else {
1038 this.enqueueTask(
1039 targetWorkerNodeKey,
1040 this.dequeueTask(workerNodeKey) as Task<Data>
1041 )
1042 }
1043 }
1044 }
1045
1046 /**
1047 * This method is the listener registered for each worker message.
1048 *
1049 * @returns The listener function to execute when a message is received from a worker.
1050 */
1051 protected workerListener (): (message: MessageValue<Response>) => void {
1052 return message => {
1053 this.checkMessageWorkerId(message)
1054 if (message.ready != null) {
1055 // Worker ready response received
1056 this.handleWorkerReadyResponse(message)
1057 } else if (message.id != null) {
1058 // Task execution response received
1059 this.handleTaskExecutionResponse(message)
1060 }
1061 }
1062 }
1063
1064 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1065 this.getWorkerInfo(
1066 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
1067 ).ready = message.ready as boolean
1068 if (this.emitter != null && this.ready) {
1069 this.emitter.emit(PoolEvents.ready, this.info)
1070 }
1071 }
1072
1073 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1074 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1075 if (promiseResponse != null) {
1076 if (message.taskError != null) {
1077 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1078 promiseResponse.reject(message.taskError.message)
1079 } else {
1080 promiseResponse.resolve(message.data as Response)
1081 }
1082 const workerNodeKey = promiseResponse.workerNodeKey
1083 this.afterTaskExecutionHook(workerNodeKey, message)
1084 this.promiseResponseMap.delete(message.id as string)
1085 if (
1086 this.opts.enableTasksQueue === true &&
1087 this.tasksQueueSize(workerNodeKey) > 0 &&
1088 this.workerNodes[workerNodeKey].usage.tasks.executing <
1089 (this.opts.tasksQueueOptions?.concurrency as number)
1090 ) {
1091 this.executeTask(
1092 workerNodeKey,
1093 this.dequeueTask(workerNodeKey) as Task<Data>
1094 )
1095 }
1096 this.workerChoiceStrategyContext.update(workerNodeKey)
1097 }
1098 }
1099
1100 private checkAndEmitEvents (): void {
1101 if (this.emitter != null) {
1102 if (this.busy) {
1103 this.emitter.emit(PoolEvents.busy, this.info)
1104 }
1105 if (this.type === PoolTypes.dynamic && this.full) {
1106 this.emitter.emit(PoolEvents.full, this.info)
1107 }
1108 }
1109 }
1110
1111 /**
1112 * Gets the worker information given its worker node key.
1113 *
1114 * @param workerNodeKey - The worker node key.
1115 * @returns The worker information.
1116 */
1117 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1118 return this.workerNodes[workerNodeKey].info
1119 }
1120
1121 /**
1122 * Adds the given worker in the pool worker nodes.
1123 *
1124 * @param worker - The worker.
1125 * @returns The added worker node key.
1126 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1127 */
1128 private addWorkerNode (worker: Worker): number {
1129 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1130 // Flag the worker node as ready at pool startup.
1131 if (this.starting) {
1132 workerNode.info.ready = true
1133 }
1134 this.workerNodes.push(workerNode)
1135 const workerNodeKey = this.getWorkerNodeKey(worker)
1136 if (workerNodeKey === -1) {
1137 throw new Error('Worker node not found')
1138 }
1139 return workerNodeKey
1140 }
1141
1142 /**
1143 * Removes the given worker from the pool worker nodes.
1144 *
1145 * @param worker - The worker.
1146 */
1147 private removeWorkerNode (worker: Worker): void {
1148 const workerNodeKey = this.getWorkerNodeKey(worker)
1149 if (workerNodeKey !== -1) {
1150 this.workerNodes.splice(workerNodeKey, 1)
1151 this.workerChoiceStrategyContext.remove(workerNodeKey)
1152 }
1153 }
1154
1155 /**
1156 * Executes the given task on the worker given its worker node key.
1157 *
1158 * @param workerNodeKey - The worker node key.
1159 * @param task - The task to execute.
1160 */
1161 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1162 this.beforeTaskExecutionHook(workerNodeKey, task)
1163 this.sendToWorker(workerNodeKey, task)
1164 }
1165
1166 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1167 return this.workerNodes[workerNodeKey].enqueueTask(task)
1168 }
1169
1170 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1171 return this.workerNodes[workerNodeKey].dequeueTask()
1172 }
1173
1174 private tasksQueueSize (workerNodeKey: number): number {
1175 return this.workerNodes[workerNodeKey].tasksQueueSize()
1176 }
1177
1178 private flushTasksQueue (workerNodeKey: number): void {
1179 while (this.tasksQueueSize(workerNodeKey) > 0) {
1180 this.executeTask(
1181 workerNodeKey,
1182 this.dequeueTask(workerNodeKey) as Task<Data>
1183 )
1184 }
1185 this.workerNodes[workerNodeKey].clearTasksQueue()
1186 }
1187
1188 private flushTasksQueues (): void {
1189 for (const [workerNodeKey] of this.workerNodes.entries()) {
1190 this.flushTasksQueue(workerNodeKey)
1191 }
1192 }
1193 }