refactor: factor out measurement statistics update
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 MessageHandler,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 Measurements,
40 WorkerChoiceStrategies,
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
43 } from './selection-strategies/selection-strategies-types'
44 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
45 import { version } from './version'
46 import { WorkerNode } from './worker-node'
47
48 /**
49 * Base class that implements some shared logic for all poolifier pools.
50 *
51 * @typeParam Worker - Type of worker which manages this pool.
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractPool<
56 Worker extends IWorker,
57 Data = unknown,
58 Response = unknown
59 > implements IPool<Worker, Data, Response> {
60 /** @inheritDoc */
61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
62
63 /** @inheritDoc */
64 public readonly emitter?: PoolEmitter
65
66 /**
67 * The execution response promise map.
68 *
69 * - `key`: The message id of each submitted task.
70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
71 *
72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
73 */
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
78
79 /**
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
83 Worker,
84 Data,
85 Response
86 >
87
88 /**
89 * Whether the pool is starting or not.
90 */
91 private readonly starting: boolean
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
97 /**
98 * Constructs a new poolifier pool.
99 *
100 * @param numberOfWorkers - Number of workers that this pool should manage.
101 * @param filePath - Path to the worker file.
102 * @param opts - Options for the pool.
103 */
104 public constructor (
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
108 ) {
109 if (!this.isMain()) {
110 throw new Error('Cannot start a pool from a worker!')
111 }
112 this.checkNumberOfWorkers(this.numberOfWorkers)
113 this.checkFilePath(this.filePath)
114 this.checkPoolOptions(this.opts)
115
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
120
121 if (this.opts.enableEvents === true) {
122 this.emitter = new PoolEmitter()
123 }
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
133
134 this.setupHook()
135
136 this.starting = true
137 this.startPool()
138 this.starting = false
139
140 this.startTimestamp = performance.now()
141 }
142
143 private checkFilePath (filePath: string): void {
144 if (
145 filePath == null ||
146 typeof filePath !== 'string' ||
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
149 throw new Error('Please specify a file with a worker implementation')
150 }
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
154 }
155
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
162 throw new TypeError(
163 'Cannot instantiate a pool with a non safe integer number of workers'
164 )
165 } else if (numberOfWorkers < 0) {
166 throw new RangeError(
167 'Cannot instantiate a pool with a negative number of workers'
168 )
169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
180 } else if (max === 0) {
181 throw new RangeError(
182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
189 }
190 }
191
192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
216 }
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
223 throw new Error(
224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
225 )
226 }
227 }
228
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
237 if (
238 workerChoiceStrategyOptions.weights != null &&
239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
255 }
256
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
275 throw new Error(
276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
277 )
278 }
279 }
280
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
293 /** @inheritDoc */
294 public get info (): PoolInfo {
295 return {
296 version,
297 type: this.type,
298 worker: this.worker,
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
301 minSize: this.minSize,
302 maxSize: this.maxSize,
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 workerNode.usage.tasks.executing === 0
311 ? accumulator + 1
312 : accumulator,
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
318 0
319 ),
320 executedTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.executed,
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.executing,
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + workerNode.usage.tasks.queued,
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
338 0
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.failed,
343 0
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
353 )
354 ),
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
360 )
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
394 )
395 ),
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
401 )
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
425 }
426 })
427 }
428 }
429
430 private get ready (): boolean {
431 return (
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
439 )
440 }
441
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
448 const poolTimeCapacity =
449 (performance.now() - this.startTimestamp) * this.maxSize
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
458 0
459 )
460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
461 }
462
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
475 /**
476 * Pool minimum size.
477 */
478 protected abstract get minSize (): number
479
480 /**
481 * Pool maximum size.
482 */
483 protected abstract get maxSize (): number
484
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
513 /**
514 * Gets the given worker its worker node key.
515 *
516 * @param worker - The worker.
517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
518 */
519 private getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategy (
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
529 ): void {
530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
531 this.opts.workerChoiceStrategy = workerChoiceStrategy
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
538 for (const workerNode of this.workerNodes) {
539 workerNode.resetUsage()
540 this.setWorkerStatistics(workerNode.worker)
541 }
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
552 )
553 }
554
555 /** @inheritDoc */
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
560 if (this.opts.enableTasksQueue === true && !enable) {
561 this.flushTasksQueues()
562 }
563 this.opts.enableTasksQueue = enable
564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
565 }
566
567 /** @inheritDoc */
568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
569 if (this.opts.enableTasksQueue === true) {
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
573 } else if (this.opts.tasksQueueOptions != null) {
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
594
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
601
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
607 protected internalBusy (): boolean {
608 return (
609 this.workerNodes.findIndex(workerNode => {
610 return workerNode.usage.tasks.executing === 0
611 }) === -1
612 )
613 }
614
615 /** @inheritDoc */
616 public async execute (data?: Data, name?: string): Promise<Response> {
617 const timestamp = performance.now()
618 const workerNodeKey = this.chooseWorkerNode()
619 const submittedTask: Task<Data> = {
620 name: name ?? DEFAULT_TASK_NAME,
621 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
622 data: data ?? ({} as Data),
623 timestamp,
624 workerId: this.getWorkerInfo(workerNodeKey).id as number,
625 id: randomUUID()
626 }
627 const res = new Promise<Response>((resolve, reject) => {
628 this.promiseResponseMap.set(submittedTask.id as string, {
629 resolve,
630 reject,
631 worker: this.workerNodes[workerNodeKey].worker
632 })
633 })
634 if (
635 this.opts.enableTasksQueue === true &&
636 (this.busy ||
637 this.workerNodes[workerNodeKey].usage.tasks.executing >=
638 ((this.opts.tasksQueueOptions as TasksQueueOptions)
639 .concurrency as number))
640 ) {
641 this.enqueueTask(workerNodeKey, submittedTask)
642 } else {
643 this.executeTask(workerNodeKey, submittedTask)
644 }
645 this.checkAndEmitEvents()
646 // eslint-disable-next-line @typescript-eslint/return-await
647 return res
648 }
649
650 /** @inheritDoc */
651 public async destroy (): Promise<void> {
652 await Promise.all(
653 this.workerNodes.map(async (workerNode, workerNodeKey) => {
654 this.flushTasksQueue(workerNodeKey)
655 // FIXME: wait for tasks to be finished
656 const workerExitPromise = new Promise<void>(resolve => {
657 workerNode.worker.on('exit', () => {
658 resolve()
659 })
660 })
661 await this.destroyWorker(workerNode.worker)
662 await workerExitPromise
663 })
664 )
665 }
666
667 /**
668 * Terminates the given worker.
669 *
670 * @param worker - A worker within `workerNodes`.
671 */
672 protected abstract destroyWorker (worker: Worker): void | Promise<void>
673
674 /**
675 * Setup hook to execute code before worker nodes are created in the abstract constructor.
676 * Can be overridden.
677 *
678 * @virtual
679 */
680 protected setupHook (): void {
681 // Intentionally empty
682 }
683
684 /**
685 * Should return whether the worker is the main worker or not.
686 */
687 protected abstract isMain (): boolean
688
689 /**
690 * Hook executed before the worker task execution.
691 * Can be overridden.
692 *
693 * @param workerNodeKey - The worker node key.
694 * @param task - The task to execute.
695 */
696 protected beforeTaskExecutionHook (
697 workerNodeKey: number,
698 task: Task<Data>
699 ): void {
700 const workerUsage = this.workerNodes[workerNodeKey].usage
701 ++workerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(workerUsage, task)
703 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
704 task.name as string
705 ) as WorkerUsage
706 ++taskWorkerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
708 }
709
710 /**
711 * Hook executed after the worker task execution.
712 * Can be overridden.
713 *
714 * @param worker - The worker.
715 * @param message - The received message.
716 */
717 protected afterTaskExecutionHook (
718 worker: Worker,
719 message: MessageValue<Response>
720 ): void {
721 const workerNodeKey = this.getWorkerNodeKey(worker)
722 const workerUsage = this.workerNodes[workerNodeKey].usage
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
728 ) as WorkerUsage
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
743 ++workerTaskStatistics.failed
744 }
745 }
746
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
749 message: MessageValue<Response>
750 ): void {
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
757 }
758
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
761 task: Task<Data>
762 ): void {
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
771 }
772
773 private updateEluWorkerUsage (
774 workerUsage: WorkerUsage,
775 message: MessageValue<Response>
776 ): void {
777 updateMeasurementStatistics(
778 workerUsage.elu.active,
779 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu,
780 message.taskPerformance?.elu?.active ?? 0,
781 workerUsage.tasks.executed
782 )
783 updateMeasurementStatistics(
784 workerUsage.elu.idle,
785 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu,
786 message.taskPerformance?.elu?.idle ?? 0,
787 workerUsage.tasks.executed
788 )
789 if (
790 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
791 .aggregate
792 ) {
793 if (message.taskPerformance?.elu != null) {
794 if (workerUsage.elu.utilization != null) {
795 workerUsage.elu.utilization =
796 (workerUsage.elu.utilization +
797 message.taskPerformance.elu.utilization) /
798 2
799 } else {
800 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
801 }
802 }
803 }
804 }
805
806 /**
807 * Chooses a worker node for the next task.
808 *
809 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
810 *
811 * @returns The worker node key
812 */
813 private chooseWorkerNode (): number {
814 if (this.shallCreateDynamicWorker()) {
815 const worker = this.createAndSetupDynamicWorker()
816 if (
817 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
818 ) {
819 return this.getWorkerNodeKey(worker)
820 }
821 }
822 return this.workerChoiceStrategyContext.execute()
823 }
824
825 /**
826 * Conditions for dynamic worker creation.
827 *
828 * @returns Whether to create a dynamic worker or not.
829 */
830 private shallCreateDynamicWorker (): boolean {
831 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
832 }
833
834 /**
835 * Sends a message to the given worker.
836 *
837 * @param worker - The worker which should receive the message.
838 * @param message - The message.
839 */
840 protected abstract sendToWorker (
841 worker: Worker,
842 message: MessageValue<Data>
843 ): void
844
845 /**
846 * Creates a new worker.
847 *
848 * @returns Newly created worker.
849 */
850 protected abstract createWorker (): Worker
851
852 /**
853 * Creates a new worker and sets it up completely in the pool worker nodes.
854 *
855 * @returns New, completely set up worker.
856 */
857 protected createAndSetupWorker (): Worker {
858 const worker = this.createWorker()
859
860 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
861 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
862 worker.on('error', error => {
863 const workerNodeKey = this.getWorkerNodeKey(worker)
864 const workerInfo = this.getWorkerInfo(workerNodeKey)
865 workerInfo.ready = false
866 this.emitter?.emit(PoolEvents.error, error)
867 if (this.opts.restartWorkerOnError === true && !this.starting) {
868 if (workerInfo.dynamic) {
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
873 }
874 if (this.opts.enableTasksQueue === true) {
875 this.redistributeQueuedTasks(workerNodeKey)
876 }
877 })
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
880 worker.once('exit', () => {
881 this.removeWorkerNode(worker)
882 })
883
884 this.addWorkerNode(worker)
885
886 this.afterWorkerSetup(worker)
887
888 return worker
889 }
890
891 /**
892 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
893 *
894 * @returns New, completely set up dynamic worker.
895 */
896 protected createAndSetupDynamicWorker (): Worker {
897 const worker = this.createAndSetupWorker()
898 this.registerWorkerMessageListener(worker, message => {
899 const workerNodeKey = this.getWorkerNodeKey(worker)
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
905 (this.opts.enableTasksQueue === true &&
906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
907 this.tasksQueueSize(workerNodeKey) === 0)))
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
913 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
914 workerInfo.dynamic = true
915 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
916 workerInfo.ready = true
917 }
918 this.sendToWorker(worker, {
919 checkActive: true,
920 workerId: workerInfo.id as number
921 })
922 return worker
923 }
924
925 /**
926 * Registers a listener callback on the given worker.
927 *
928 * @param worker - The worker which should register a listener.
929 * @param listener - The message listener callback.
930 */
931 private registerWorkerMessageListener<Message extends Data | Response>(
932 worker: Worker,
933 listener: (message: MessageValue<Message>) => void
934 ): void {
935 worker.on('message', listener as MessageHandler<Worker>)
936 }
937
938 /**
939 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
940 * Can be overridden.
941 *
942 * @param worker - The newly created worker.
943 */
944 protected afterWorkerSetup (worker: Worker): void {
945 // Listen to worker messages.
946 this.registerWorkerMessageListener(worker, this.workerListener())
947 // Send startup message to worker.
948 this.sendWorkerStartupMessage(worker)
949 // Setup worker task statistics computation.
950 this.setWorkerStatistics(worker)
951 }
952
953 private sendWorkerStartupMessage (worker: Worker): void {
954 this.sendToWorker(worker, {
955 ready: false,
956 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
957 })
958 }
959
960 private redistributeQueuedTasks (workerNodeKey: number): void {
961 while (this.tasksQueueSize(workerNodeKey) > 0) {
962 let targetWorkerNodeKey: number = workerNodeKey
963 let minQueuedTasks = Infinity
964 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
965 const workerInfo = this.getWorkerInfo(workerNodeId)
966 if (
967 workerNodeId !== workerNodeKey &&
968 workerInfo.ready &&
969 workerNode.usage.tasks.queued === 0
970 ) {
971 targetWorkerNodeKey = workerNodeId
972 break
973 }
974 if (
975 workerNodeId !== workerNodeKey &&
976 workerInfo.ready &&
977 workerNode.usage.tasks.queued < minQueuedTasks
978 ) {
979 minQueuedTasks = workerNode.usage.tasks.queued
980 targetWorkerNodeKey = workerNodeId
981 }
982 }
983 this.enqueueTask(
984 targetWorkerNodeKey,
985 this.dequeueTask(workerNodeKey) as Task<Data>
986 )
987 }
988 }
989
990 /**
991 * This function is the listener registered for each worker message.
992 *
993 * @returns The listener function to execute when a message is received from a worker.
994 */
995 protected workerListener (): (message: MessageValue<Response>) => void {
996 return message => {
997 this.checkMessageWorkerId(message)
998 if (message.ready != null) {
999 // Worker ready response received
1000 this.handleWorkerReadyResponse(message)
1001 } else if (message.id != null) {
1002 // Task execution response received
1003 this.handleTaskExecutionResponse(message)
1004 }
1005 }
1006 }
1007
1008 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1009 const worker = this.getWorkerById(message.workerId)
1010 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1011 message.ready as boolean
1012 if (this.emitter != null && this.ready) {
1013 this.emitter.emit(PoolEvents.ready, this.info)
1014 }
1015 }
1016
1017 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1018 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1019 if (promiseResponse != null) {
1020 if (message.taskError != null) {
1021 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1022 promiseResponse.reject(message.taskError.message)
1023 } else {
1024 promiseResponse.resolve(message.data as Response)
1025 }
1026 this.afterTaskExecutionHook(promiseResponse.worker, message)
1027 this.promiseResponseMap.delete(message.id as string)
1028 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1029 if (
1030 this.opts.enableTasksQueue === true &&
1031 this.tasksQueueSize(workerNodeKey) > 0
1032 ) {
1033 this.executeTask(
1034 workerNodeKey,
1035 this.dequeueTask(workerNodeKey) as Task<Data>
1036 )
1037 }
1038 this.workerChoiceStrategyContext.update(workerNodeKey)
1039 }
1040 }
1041
1042 private checkAndEmitEvents (): void {
1043 if (this.emitter != null) {
1044 if (this.busy) {
1045 this.emitter.emit(PoolEvents.busy, this.info)
1046 }
1047 if (this.type === PoolTypes.dynamic && this.full) {
1048 this.emitter.emit(PoolEvents.full, this.info)
1049 }
1050 }
1051 }
1052
1053 /**
1054 * Gets the worker information.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 */
1058 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1059 return this.workerNodes[workerNodeKey].info
1060 }
1061
1062 /**
1063 * Adds the given worker in the pool worker nodes.
1064 *
1065 * @param worker - The worker.
1066 * @returns The worker nodes length.
1067 */
1068 private addWorkerNode (worker: Worker): number {
1069 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1070 // Flag the worker node as ready at pool startup.
1071 if (this.starting) {
1072 workerNode.info.ready = true
1073 }
1074 return this.workerNodes.push(workerNode)
1075 }
1076
1077 /**
1078 * Removes the given worker from the pool worker nodes.
1079 *
1080 * @param worker - The worker.
1081 */
1082 private removeWorkerNode (worker: Worker): void {
1083 const workerNodeKey = this.getWorkerNodeKey(worker)
1084 if (workerNodeKey !== -1) {
1085 this.workerNodes.splice(workerNodeKey, 1)
1086 this.workerChoiceStrategyContext.remove(workerNodeKey)
1087 }
1088 }
1089
1090 /**
1091 * Executes the given task on the given worker.
1092 *
1093 * @param worker - The worker.
1094 * @param task - The task to execute.
1095 */
1096 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1097 this.beforeTaskExecutionHook(workerNodeKey, task)
1098 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1099 }
1100
1101 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1102 return this.workerNodes[workerNodeKey].enqueueTask(task)
1103 }
1104
1105 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1106 return this.workerNodes[workerNodeKey].dequeueTask()
1107 }
1108
1109 private tasksQueueSize (workerNodeKey: number): number {
1110 return this.workerNodes[workerNodeKey].tasksQueueSize()
1111 }
1112
1113 private flushTasksQueue (workerNodeKey: number): void {
1114 while (this.tasksQueueSize(workerNodeKey) > 0) {
1115 this.executeTask(
1116 workerNodeKey,
1117 this.dequeueTask(workerNodeKey) as Task<Data>
1118 )
1119 }
1120 this.workerNodes[workerNodeKey].clearTasksQueue()
1121 }
1122
1123 private flushTasksQueues (): void {
1124 for (const [workerNodeKey] of this.workerNodes.entries()) {
1125 this.flushTasksQueue(workerNodeKey)
1126 }
1127 }
1128
1129 private setWorkerStatistics (worker: Worker): void {
1130 this.sendToWorker(worker, {
1131 statistics: {
1132 runTime:
1133 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1134 .runTime.aggregate,
1135 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1136 .elu.aggregate
1137 },
1138 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1139 })
1140 }
1141 }