refactor: cleanup pool.execute() structure
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 type MeasurementStatisticsRequirements,
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The task execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
78
79 /**
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
83 Worker,
84 Data,
85 Response
86 >
87
88 /**
89 * Whether the pool is starting or not.
90 */
91 private readonly starting: boolean
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
97 /**
98 * Constructs a new poolifier pool.
99 *
100 * @param numberOfWorkers - Number of workers that this pool should manage.
101 * @param filePath - Path to the worker file.
102 * @param opts - Options for the pool.
103 */
104 public constructor (
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
108 ) {
109 if (!this.isMain()) {
110 throw new Error('Cannot start a pool from a worker!')
111 }
112 this.checkNumberOfWorkers(this.numberOfWorkers)
113 this.checkFilePath(this.filePath)
114 this.checkPoolOptions(this.opts)
115
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
180 } else if (max === 0) {
181 throw new RangeError(
182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
189 }
190 }
191
192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
216 }
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
223 throw new Error(
224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
225 )
226 }
227 }
228
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
237 if (
238 workerChoiceStrategyOptions.weights != null &&
239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
255 }
256
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
275 throw new Error(
276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
277 )
278 }
279 }
280
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
293 /** @inheritDoc */
294 public get info (): PoolInfo {
295 return {
296 version,
297 type: this.type,
298 worker: this.worker,
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
301 minSize: this.minSize,
302 maxSize: this.maxSize,
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 workerNode.usage.tasks.executing === 0
311 ? accumulator + 1
312 : accumulator,
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
318 0
319 ),
320 executedTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.executed,
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.executing,
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + workerNode.usage.tasks.queued,
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
338 0
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.failed,
343 0
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
353 )
354 ),
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
360 )
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
394 )
395 ),
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
401 )
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
425 }
426 })
427 }
428 }
429
430 private get ready (): boolean {
431 return (
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
439 )
440 }
441
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
448 const poolTimeCapacity =
449 (performance.now() - this.startTimestamp) * this.maxSize
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
458 0
459 )
460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
461 }
462
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
475 /**
476 * Pool minimum size.
477 */
478 protected abstract get minSize (): number
479
480 /**
481 * Pool maximum size.
482 */
483 protected abstract get maxSize (): number
484
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
513 /**
514 * Gets the given worker its worker node key.
515 *
516 * @param worker - The worker.
517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
518 */
519 protected getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
529 ): void {
530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
531 this.opts.workerChoiceStrategy = workerChoiceStrategy
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
538 for (const workerNode of this.workerNodes) {
539 workerNode.resetUsage()
540 this.setWorkerStatistics(workerNode.worker)
541 }
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
552 )
553 }
554
555 /** @inheritDoc */
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
560 if (this.opts.enableTasksQueue === true && !enable) {
561 this.flushTasksQueues()
562 }
563 this.opts.enableTasksQueue = enable
564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
565 }
566
567 /** @inheritDoc */
568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
569 if (this.opts.enableTasksQueue === true) {
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
573 } else if (this.opts.tasksQueueOptions != null) {
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
594
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
601
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
607 protected internalBusy (): boolean {
608 return (
609 this.workerNodes.findIndex(workerNode => {
610 return workerNode.usage.tasks.executing === 0
611 }) === -1
612 )
613 }
614
615 /** @inheritDoc */
616 public async execute (data?: Data, name?: string): Promise<Response> {
617 return await new Promise<Response>((resolve, reject) => {
618 const timestamp = performance.now()
619 const workerNodeKey = this.chooseWorkerNode()
620 const submittedTask: Task<Data> = {
621 name: name ?? DEFAULT_TASK_NAME,
622 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
623 data: data ?? ({} as Data),
624 timestamp,
625 workerId: this.getWorkerInfo(workerNodeKey).id as number,
626 id: randomUUID()
627 }
628 this.promiseResponseMap.set(submittedTask.id as string, {
629 resolve,
630 reject,
631 worker: this.workerNodes[workerNodeKey].worker
632 })
633 if (
634 this.opts.enableTasksQueue === true &&
635 (this.busy ||
636 this.workerNodes[workerNodeKey].usage.tasks.executing >=
637 ((this.opts.tasksQueueOptions as TasksQueueOptions)
638 .concurrency as number))
639 ) {
640 this.enqueueTask(workerNodeKey, submittedTask)
641 } else {
642 this.executeTask(workerNodeKey, submittedTask)
643 }
644 this.checkAndEmitEvents()
645 })
646 }
647
648 /** @inheritDoc */
649 public async destroy (): Promise<void> {
650 await Promise.all(
651 this.workerNodes.map(async (workerNode, workerNodeKey) => {
652 this.flushTasksQueue(workerNodeKey)
653 // FIXME: wait for tasks to be finished
654 const workerExitPromise = new Promise<void>(resolve => {
655 workerNode.worker.on('exit', () => {
656 resolve()
657 })
658 })
659 await this.destroyWorker(workerNode.worker)
660 await workerExitPromise
661 })
662 )
663 }
664
665 /**
666 * Terminates the given worker.
667 *
668 * @param worker - A worker within `workerNodes`.
669 */
670 protected abstract destroyWorker (worker: Worker): void | Promise<void>
671
672 /**
673 * Setup hook to execute code before worker nodes are created in the abstract constructor.
674 * Can be overridden.
675 *
676 * @virtual
677 */
678 protected setupHook (): void {
679 // Intentionally empty
680 }
681
682 /**
683 * Should return whether the worker is the main worker or not.
684 */
685 protected abstract isMain (): boolean
686
687 /**
688 * Hook executed before the worker task execution.
689 * Can be overridden.
690 *
691 * @param workerNodeKey - The worker node key.
692 * @param task - The task to execute.
693 */
694 protected beforeTaskExecutionHook (
695 workerNodeKey: number,
696 task: Task<Data>
697 ): void {
698 const workerUsage = this.workerNodes[workerNodeKey].usage
699 ++workerUsage.tasks.executing
700 this.updateWaitTimeWorkerUsage(workerUsage, task)
701 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
702 task.name as string
703 ) as WorkerUsage
704 ++taskWorkerUsage.tasks.executing
705 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
706 }
707
708 /**
709 * Hook executed after the worker task execution.
710 * Can be overridden.
711 *
712 * @param worker - The worker.
713 * @param message - The received message.
714 */
715 protected afterTaskExecutionHook (
716 worker: Worker,
717 message: MessageValue<Response>
718 ): void {
719 const workerNodeKey = this.getWorkerNodeKey(worker)
720 const workerUsage = this.workerNodes[workerNodeKey].usage
721 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
722 this.updateRunTimeWorkerUsage(workerUsage, message)
723 this.updateEluWorkerUsage(workerUsage, message)
724 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
725 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
726 ) as WorkerUsage
727 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
728 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
729 this.updateEluWorkerUsage(taskWorkerUsage, message)
730 }
731
732 private updateTaskStatisticsWorkerUsage (
733 workerUsage: WorkerUsage,
734 message: MessageValue<Response>
735 ): void {
736 const workerTaskStatistics = workerUsage.tasks
737 --workerTaskStatistics.executing
738 if (message.taskError == null) {
739 ++workerTaskStatistics.executed
740 } else {
741 ++workerTaskStatistics.failed
742 }
743 }
744
745 private updateRunTimeWorkerUsage (
746 workerUsage: WorkerUsage,
747 message: MessageValue<Response>
748 ): void {
749 updateMeasurementStatistics(
750 workerUsage.runTime,
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
752 message.taskPerformance?.runTime ?? 0,
753 workerUsage.tasks.executed
754 )
755 }
756
757 private updateWaitTimeWorkerUsage (
758 workerUsage: WorkerUsage,
759 task: Task<Data>
760 ): void {
761 const timestamp = performance.now()
762 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
763 updateMeasurementStatistics(
764 workerUsage.waitTime,
765 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
766 taskWaitTime,
767 workerUsage.tasks.executed
768 )
769 }
770
771 private updateEluWorkerUsage (
772 workerUsage: WorkerUsage,
773 message: MessageValue<Response>
774 ): void {
775 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
776 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
777 updateMeasurementStatistics(
778 workerUsage.elu.active,
779 eluTaskStatisticsRequirements,
780 message.taskPerformance?.elu?.active ?? 0,
781 workerUsage.tasks.executed
782 )
783 updateMeasurementStatistics(
784 workerUsage.elu.idle,
785 eluTaskStatisticsRequirements,
786 message.taskPerformance?.elu?.idle ?? 0,
787 workerUsage.tasks.executed
788 )
789 if (eluTaskStatisticsRequirements.aggregate) {
790 if (message.taskPerformance?.elu != null) {
791 if (workerUsage.elu.utilization != null) {
792 workerUsage.elu.utilization =
793 (workerUsage.elu.utilization +
794 message.taskPerformance.elu.utilization) /
795 2
796 } else {
797 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
798 }
799 }
800 }
801 }
802
803 /**
804 * Chooses a worker node for the next task.
805 *
806 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
807 *
808 * @returns The worker node key
809 */
810 private chooseWorkerNode (): number {
811 if (this.shallCreateDynamicWorker()) {
812 const worker = this.createAndSetupDynamicWorker()
813 if (
814 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
815 ) {
816 return this.getWorkerNodeKey(worker)
817 }
818 }
819 return this.workerChoiceStrategyContext.execute()
820 }
821
822 /**
823 * Conditions for dynamic worker creation.
824 *
825 * @returns Whether to create a dynamic worker or not.
826 */
827 private shallCreateDynamicWorker (): boolean {
828 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
829 }
830
831 /**
832 * Sends a message to the given worker.
833 *
834 * @param worker - The worker which should receive the message.
835 * @param message - The message.
836 */
837 protected abstract sendToWorker (
838 worker: Worker,
839 message: MessageValue<Data>
840 ): void
841
842 /**
843 * Creates a new worker.
844 *
845 * @returns Newly created worker.
846 */
847 protected abstract createWorker (): Worker
848
849 /**
850 * Creates a new worker and sets it up completely in the pool worker nodes.
851 *
852 * @returns New, completely set up worker.
853 */
854 protected createAndSetupWorker (): Worker {
855 const worker = this.createWorker()
856
857 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
858 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
859 worker.on('error', error => {
860 const workerNodeKey = this.getWorkerNodeKey(worker)
861 const workerInfo = this.getWorkerInfo(workerNodeKey)
862 workerInfo.ready = false
863 this.workerNodes[workerNodeKey].closeChannel()
864 this.emitter?.emit(PoolEvents.error, error)
865 if (this.opts.restartWorkerOnError === true && !this.starting) {
866 if (workerInfo.dynamic) {
867 this.createAndSetupDynamicWorker()
868 } else {
869 this.createAndSetupWorker()
870 }
871 }
872 if (this.opts.enableTasksQueue === true) {
873 this.redistributeQueuedTasks(workerNodeKey)
874 }
875 })
876 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
877 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
878 worker.once('exit', () => {
879 this.removeWorkerNode(worker)
880 })
881
882 this.addWorkerNode(worker)
883
884 this.afterWorkerSetup(worker)
885
886 return worker
887 }
888
889 /**
890 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
891 *
892 * @returns New, completely set up dynamic worker.
893 */
894 protected createAndSetupDynamicWorker (): Worker {
895 const worker = this.createAndSetupWorker()
896 this.registerWorkerMessageListener(worker, message => {
897 const workerNodeKey = this.getWorkerNodeKey(worker)
898 if (
899 isKillBehavior(KillBehaviors.HARD, message.kill) ||
900 (message.kill != null &&
901 ((this.opts.enableTasksQueue === false &&
902 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
903 (this.opts.enableTasksQueue === true &&
904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
905 this.tasksQueueSize(workerNodeKey) === 0)))
906 ) {
907 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
908 void (this.destroyWorker(worker) as Promise<void>)
909 }
910 })
911 const workerInfo = this.getWorkerInfoByWorker(worker)
912 workerInfo.dynamic = true
913 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
914 workerInfo.ready = true
915 }
916 this.sendToWorker(worker, {
917 checkActive: true,
918 workerId: workerInfo.id as number
919 })
920 return worker
921 }
922
923 /**
924 * Registers a listener callback on the given worker.
925 *
926 * @param worker - The worker which should register a listener.
927 * @param listener - The message listener callback.
928 */
929 protected abstract registerWorkerMessageListener<
930 Message extends Data | Response
931 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
932
933 /**
934 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
935 * Can be overridden.
936 *
937 * @param worker - The newly created worker.
938 */
939 protected afterWorkerSetup (worker: Worker): void {
940 // Listen to worker messages.
941 this.registerWorkerMessageListener(worker, this.workerListener())
942 // Send the startup message to worker.
943 this.sendStartupMessageToWorker(worker)
944 // Setup worker task statistics computation.
945 this.setWorkerStatistics(worker)
946 }
947
948 /**
949 * Sends the startup message to the given worker.
950 *
951 * @param worker - The worker which should receive the startup message.
952 */
953 protected abstract sendStartupMessageToWorker (worker: Worker): void
954
955 private redistributeQueuedTasks (workerNodeKey: number): void {
956 while (this.tasksQueueSize(workerNodeKey) > 0) {
957 let targetWorkerNodeKey: number = workerNodeKey
958 let minQueuedTasks = Infinity
959 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
960 const workerInfo = this.getWorkerInfo(workerNodeId)
961 if (
962 workerNodeId !== workerNodeKey &&
963 workerInfo.ready &&
964 workerNode.usage.tasks.queued === 0
965 ) {
966 targetWorkerNodeKey = workerNodeId
967 break
968 }
969 if (
970 workerNodeId !== workerNodeKey &&
971 workerInfo.ready &&
972 workerNode.usage.tasks.queued < minQueuedTasks
973 ) {
974 minQueuedTasks = workerNode.usage.tasks.queued
975 targetWorkerNodeKey = workerNodeId
976 }
977 }
978 this.enqueueTask(
979 targetWorkerNodeKey,
980 this.dequeueTask(workerNodeKey) as Task<Data>
981 )
982 }
983 }
984
985 /**
986 * This function is the listener registered for each worker message.
987 *
988 * @returns The listener function to execute when a message is received from a worker.
989 */
990 protected workerListener (): (message: MessageValue<Response>) => void {
991 return message => {
992 this.checkMessageWorkerId(message)
993 if (message.ready != null) {
994 // Worker ready response received
995 this.handleWorkerReadyResponse(message)
996 } else if (message.id != null) {
997 // Task execution response received
998 this.handleTaskExecutionResponse(message)
999 }
1000 }
1001 }
1002
1003 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1004 this.getWorkerInfoByWorker(
1005 this.getWorkerById(message.workerId) as Worker
1006 ).ready = message.ready as boolean
1007 if (this.emitter != null && this.ready) {
1008 this.emitter.emit(PoolEvents.ready, this.info)
1009 }
1010 }
1011
1012 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1013 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1014 if (promiseResponse != null) {
1015 if (message.taskError != null) {
1016 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1017 promiseResponse.reject(message.taskError.message)
1018 } else {
1019 promiseResponse.resolve(message.data as Response)
1020 }
1021 this.afterTaskExecutionHook(promiseResponse.worker, message)
1022 this.promiseResponseMap.delete(message.id as string)
1023 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1024 if (
1025 this.opts.enableTasksQueue === true &&
1026 this.tasksQueueSize(workerNodeKey) > 0
1027 ) {
1028 this.executeTask(
1029 workerNodeKey,
1030 this.dequeueTask(workerNodeKey) as Task<Data>
1031 )
1032 }
1033 this.workerChoiceStrategyContext.update(workerNodeKey)
1034 }
1035 }
1036
1037 private checkAndEmitEvents (): void {
1038 if (this.emitter != null) {
1039 if (this.busy) {
1040 this.emitter.emit(PoolEvents.busy, this.info)
1041 }
1042 if (this.type === PoolTypes.dynamic && this.full) {
1043 this.emitter.emit(PoolEvents.full, this.info)
1044 }
1045 }
1046 }
1047
1048 /**
1049 * Gets the worker information from the given worker node key.
1050 *
1051 * @param workerNodeKey - The worker node key.
1052 * @returns The worker information.
1053 */
1054 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1055 return this.workerNodes[workerNodeKey].info
1056 }
1057
1058 /**
1059 * Gets the worker information from the given worker.
1060 *
1061 * @param worker - The worker.
1062 * @returns The worker information.
1063 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
1064 */
1065 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
1066 const workerNodeKey = this.getWorkerNodeKey(worker)
1067 if (workerNodeKey === -1) {
1068 throw new Error('Worker not found')
1069 }
1070 return this.workerNodes[workerNodeKey].info
1071 }
1072
1073 /**
1074 * Adds the given worker in the pool worker nodes.
1075 *
1076 * @param worker - The worker.
1077 * @returns The worker nodes length.
1078 */
1079 private addWorkerNode (worker: Worker): number {
1080 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1081 // Flag the worker node as ready at pool startup.
1082 if (this.starting) {
1083 workerNode.info.ready = true
1084 }
1085 return this.workerNodes.push(workerNode)
1086 }
1087
1088 /**
1089 * Removes the given worker from the pool worker nodes.
1090 *
1091 * @param worker - The worker.
1092 */
1093 private removeWorkerNode (worker: Worker): void {
1094 const workerNodeKey = this.getWorkerNodeKey(worker)
1095 if (workerNodeKey !== -1) {
1096 this.workerNodes.splice(workerNodeKey, 1)
1097 this.workerChoiceStrategyContext.remove(workerNodeKey)
1098 }
1099 }
1100
1101 /**
1102 * Executes the given task on the given worker.
1103 *
1104 * @param worker - The worker.
1105 * @param task - The task to execute.
1106 */
1107 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1108 this.beforeTaskExecutionHook(workerNodeKey, task)
1109 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1110 }
1111
1112 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1113 return this.workerNodes[workerNodeKey].enqueueTask(task)
1114 }
1115
1116 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1117 return this.workerNodes[workerNodeKey].dequeueTask()
1118 }
1119
1120 private tasksQueueSize (workerNodeKey: number): number {
1121 return this.workerNodes[workerNodeKey].tasksQueueSize()
1122 }
1123
1124 private flushTasksQueue (workerNodeKey: number): void {
1125 while (this.tasksQueueSize(workerNodeKey) > 0) {
1126 this.executeTask(
1127 workerNodeKey,
1128 this.dequeueTask(workerNodeKey) as Task<Data>
1129 )
1130 }
1131 this.workerNodes[workerNodeKey].clearTasksQueue()
1132 }
1133
1134 private flushTasksQueues (): void {
1135 for (const [workerNodeKey] of this.workerNodes.entries()) {
1136 this.flushTasksQueue(workerNodeKey)
1137 }
1138 }
1139
1140 private setWorkerStatistics (worker: Worker): void {
1141 this.sendToWorker(worker, {
1142 statistics: {
1143 runTime:
1144 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1145 .runTime.aggregate,
1146 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1147 .elu.aggregate
1148 },
1149 workerId: this.getWorkerInfoByWorker(worker).id as number
1150 })
1151 }
1152 }