refactor: remove unneeded condition in measurement statistics code
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerInfo,
32 WorkerNode,
33 WorkerUsage
34 } from './worker'
35 import {
36 Measurements,
37 WorkerChoiceStrategies,
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
40 } from './selection-strategies/selection-strategies-types'
41 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
42 import { version } from './version'
43
44 /**
45 * Base class that implements some shared logic for all poolifier pools.
46 *
47 * @typeParam Worker - Type of worker which manages this pool.
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
50 */
51 export abstract class AbstractPool<
52 Worker extends IWorker,
53 Data = unknown,
54 Response = unknown
55 > implements IPool<Worker, Data, Response> {
56 /** @inheritDoc */
57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
58
59 /** @inheritDoc */
60 public readonly emitter?: PoolEmitter
61
62 /**
63 * The execution response promise map.
64 *
65 * - `key`: The message id of each submitted task.
66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
67 *
68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
69 */
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
74
75 /**
76 * Worker choice strategy context referencing a worker choice algorithm implementation.
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
79 Worker,
80 Data,
81 Response
82 >
83
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
89 /**
90 * Constructs a new poolifier pool.
91 *
92 * @param numberOfWorkers - Number of workers that this pool should manage.
93 * @param filePath - Path to the worker file.
94 * @param opts - Options for the pool.
95 */
96 public constructor (
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
100 ) {
101 if (!this.isMain()) {
102 throw new Error('Cannot start a pool from a worker!')
103 }
104 this.checkNumberOfWorkers(this.numberOfWorkers)
105 this.checkFilePath(this.filePath)
106 this.checkPoolOptions(this.opts)
107
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
112
113 if (this.opts.enableEvents === true) {
114 this.emitter = new PoolEmitter()
115 }
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
125
126 this.setupHook()
127
128 while (this.workerNodes.length < this.numberOfWorkers) {
129 this.createAndSetupWorker()
130 }
131
132 this.startTimestamp = performance.now()
133 }
134
135 private checkFilePath (filePath: string): void {
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
150 throw new TypeError(
151 'Cannot instantiate a pool with a non safe integer number of workers'
152 )
153 } else if (numberOfWorkers < 0) {
154 throw new RangeError(
155 'Cannot instantiate a pool with a negative number of workers'
156 )
157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
186 }
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
193 throw new Error(
194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
195 )
196 }
197 }
198
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
207 if (
208 workerChoiceStrategyOptions.weights != null &&
209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
225 }
226
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
245 throw new Error(
246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
247 )
248 }
249 }
250
251 /** @inheritDoc */
252 public get info (): PoolInfo {
253 return {
254 version,
255 type: this.type,
256 worker: this.worker,
257 minSize: this.minSize,
258 maxSize: this.maxSize,
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 workerNode.usage.tasks.executing === 0
267 ? accumulator + 1
268 : accumulator,
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
274 0
275 ),
276 executedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 accumulator + workerNode.usage.tasks.executed,
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.usage.tasks.executing,
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 accumulator + workerNode.usage.tasks.queued,
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 accumulator + workerNode.usage.tasks.maxQueued,
294 0
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
298 accumulator + workerNode.usage.tasks.failed,
299 0
300 ),
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate && {
303 runTime: {
304 minimum: Math.min(
305 ...this.workerNodes.map(
306 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
307 )
308 ),
309 maximum: Math.max(
310 ...this.workerNodes.map(
311 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
312 )
313 )
314 }
315 }),
316 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
317 .waitTime.aggregate && {
318 waitTime: {
319 minimum: Math.min(
320 ...this.workerNodes.map(
321 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
322 )
323 ),
324 maximum: Math.max(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
327 )
328 )
329 }
330 })
331 }
332 }
333
334 /**
335 * Gets the approximate pool utilization.
336 *
337 * @returns The pool utilization.
338 */
339 private get utilization (): number {
340 const poolRunTimeCapacity =
341 (performance.now() - this.startTimestamp) * this.maxSize
342 const totalTasksRunTime = this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
345 0
346 )
347 const totalTasksWaitTime = this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
350 0
351 )
352 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
353 }
354
355 /**
356 * Pool type.
357 *
358 * If it is `'dynamic'`, it provides the `max` property.
359 */
360 protected abstract get type (): PoolType
361
362 /**
363 * Gets the worker type.
364 */
365 protected abstract get worker (): WorkerType
366
367 /**
368 * Pool minimum size.
369 */
370 protected abstract get minSize (): number
371
372 /**
373 * Pool maximum size.
374 */
375 protected abstract get maxSize (): number
376
377 /**
378 * Get the worker given its id.
379 *
380 * @param workerId - The worker id.
381 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
382 */
383 private getWorkerById (workerId: number): Worker | undefined {
384 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
385 ?.worker
386 }
387
388 /**
389 * Gets the given worker its worker node key.
390 *
391 * @param worker - The worker.
392 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
393 */
394 private getWorkerNodeKey (worker: Worker): number {
395 return this.workerNodes.findIndex(
396 workerNode => workerNode.worker === worker
397 )
398 }
399
400 /** @inheritDoc */
401 public setWorkerChoiceStrategy (
402 workerChoiceStrategy: WorkerChoiceStrategy,
403 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
404 ): void {
405 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
406 this.opts.workerChoiceStrategy = workerChoiceStrategy
407 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
408 this.opts.workerChoiceStrategy
409 )
410 if (workerChoiceStrategyOptions != null) {
411 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
412 }
413 for (const workerNode of this.workerNodes) {
414 this.setWorkerNodeTasksUsage(
415 workerNode,
416 this.getInitialWorkerUsage(workerNode.worker)
417 )
418 this.setWorkerStatistics(workerNode.worker)
419 }
420 }
421
422 /** @inheritDoc */
423 public setWorkerChoiceStrategyOptions (
424 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
425 ): void {
426 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
427 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
428 this.workerChoiceStrategyContext.setOptions(
429 this.opts.workerChoiceStrategyOptions
430 )
431 }
432
433 /** @inheritDoc */
434 public enableTasksQueue (
435 enable: boolean,
436 tasksQueueOptions?: TasksQueueOptions
437 ): void {
438 if (this.opts.enableTasksQueue === true && !enable) {
439 this.flushTasksQueues()
440 }
441 this.opts.enableTasksQueue = enable
442 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
443 }
444
445 /** @inheritDoc */
446 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
447 if (this.opts.enableTasksQueue === true) {
448 this.checkValidTasksQueueOptions(tasksQueueOptions)
449 this.opts.tasksQueueOptions =
450 this.buildTasksQueueOptions(tasksQueueOptions)
451 } else if (this.opts.tasksQueueOptions != null) {
452 delete this.opts.tasksQueueOptions
453 }
454 }
455
456 private buildTasksQueueOptions (
457 tasksQueueOptions: TasksQueueOptions
458 ): TasksQueueOptions {
459 return {
460 concurrency: tasksQueueOptions?.concurrency ?? 1
461 }
462 }
463
464 /**
465 * Whether the pool is full or not.
466 *
467 * The pool filling boolean status.
468 */
469 protected get full (): boolean {
470 return this.workerNodes.length >= this.maxSize
471 }
472
473 /**
474 * Whether the pool is busy or not.
475 *
476 * The pool busyness boolean status.
477 */
478 protected abstract get busy (): boolean
479
480 /**
481 * Whether worker nodes are executing at least one task.
482 *
483 * @returns Worker nodes busyness boolean status.
484 */
485 protected internalBusy (): boolean {
486 return (
487 this.workerNodes.findIndex(workerNode => {
488 return workerNode.usage.tasks.executing === 0
489 }) === -1
490 )
491 }
492
493 /** @inheritDoc */
494 public async execute (data?: Data, name?: string): Promise<Response> {
495 const timestamp = performance.now()
496 const workerNodeKey = this.chooseWorkerNode()
497 const submittedTask: Task<Data> = {
498 name,
499 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
500 data: data ?? ({} as Data),
501 timestamp,
502 id: randomUUID()
503 }
504 const res = new Promise<Response>((resolve, reject) => {
505 this.promiseResponseMap.set(submittedTask.id as string, {
506 resolve,
507 reject,
508 worker: this.workerNodes[workerNodeKey].worker
509 })
510 })
511 if (
512 this.opts.enableTasksQueue === true &&
513 (this.busy ||
514 this.workerNodes[workerNodeKey].usage.tasks.executing >=
515 ((this.opts.tasksQueueOptions as TasksQueueOptions)
516 .concurrency as number))
517 ) {
518 this.enqueueTask(workerNodeKey, submittedTask)
519 } else {
520 this.executeTask(workerNodeKey, submittedTask)
521 }
522 this.checkAndEmitEvents()
523 // eslint-disable-next-line @typescript-eslint/return-await
524 return res
525 }
526
527 /** @inheritDoc */
528 public async destroy (): Promise<void> {
529 await Promise.all(
530 this.workerNodes.map(async (workerNode, workerNodeKey) => {
531 this.flushTasksQueue(workerNodeKey)
532 // FIXME: wait for tasks to be finished
533 const workerExitPromise = new Promise<void>(resolve => {
534 workerNode.worker.on('exit', () => {
535 resolve()
536 })
537 })
538 await this.destroyWorker(workerNode.worker)
539 await workerExitPromise
540 })
541 )
542 }
543
544 /**
545 * Terminates the given worker.
546 *
547 * @param worker - A worker within `workerNodes`.
548 */
549 protected abstract destroyWorker (worker: Worker): void | Promise<void>
550
551 /**
552 * Setup hook to execute code before worker nodes are created in the abstract constructor.
553 * Can be overridden.
554 *
555 * @virtual
556 */
557 protected setupHook (): void {
558 // Intentionally empty
559 }
560
561 /**
562 * Should return whether the worker is the main worker or not.
563 */
564 protected abstract isMain (): boolean
565
566 /**
567 * Hook executed before the worker task execution.
568 * Can be overridden.
569 *
570 * @param workerNodeKey - The worker node key.
571 * @param task - The task to execute.
572 */
573 protected beforeTaskExecutionHook (
574 workerNodeKey: number,
575 task: Task<Data>
576 ): void {
577 const workerUsage = this.workerNodes[workerNodeKey].usage
578 ++workerUsage.tasks.executing
579 this.updateWaitTimeWorkerUsage(workerUsage, task)
580 }
581
582 /**
583 * Hook executed after the worker task execution.
584 * Can be overridden.
585 *
586 * @param worker - The worker.
587 * @param message - The received message.
588 */
589 protected afterTaskExecutionHook (
590 worker: Worker,
591 message: MessageValue<Response>
592 ): void {
593 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
594 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
595 this.updateRunTimeWorkerUsage(workerUsage, message)
596 this.updateEluWorkerUsage(workerUsage, message)
597 }
598
599 private updateTaskStatisticsWorkerUsage (
600 workerUsage: WorkerUsage,
601 message: MessageValue<Response>
602 ): void {
603 const workerTaskStatistics = workerUsage.tasks
604 --workerTaskStatistics.executing
605 ++workerTaskStatistics.executed
606 if (message.taskError != null) {
607 ++workerTaskStatistics.failed
608 }
609 }
610
611 private updateRunTimeWorkerUsage (
612 workerUsage: WorkerUsage,
613 message: MessageValue<Response>
614 ): void {
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
617 .aggregate
618 ) {
619 const taskRunTime = message.taskPerformance?.runTime ?? 0
620 workerUsage.runTime.aggregate =
621 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
622 workerUsage.runTime.minimum = Math.min(
623 taskRunTime,
624 workerUsage.runTime?.minimum ?? Infinity
625 )
626 workerUsage.runTime.maximum = Math.max(
627 taskRunTime,
628 workerUsage.runTime?.maximum ?? -Infinity
629 )
630 if (
631 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
632 .average &&
633 workerUsage.tasks.executed !== 0
634 ) {
635 workerUsage.runTime.average =
636 workerUsage.runTime.aggregate /
637 (workerUsage.tasks.executed - workerUsage.tasks.failed)
638 }
639 if (
640 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
641 .median &&
642 message.taskPerformance?.runTime != null
643 ) {
644 workerUsage.runTime.history.push(message.taskPerformance.runTime)
645 workerUsage.runTime.median = median(workerUsage.runTime.history)
646 }
647 }
648 }
649
650 private updateWaitTimeWorkerUsage (
651 workerUsage: WorkerUsage,
652 task: Task<Data>
653 ): void {
654 const timestamp = performance.now()
655 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
656 if (
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
658 .aggregate
659 ) {
660 workerUsage.waitTime.aggregate =
661 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
662 workerUsage.waitTime.minimum = Math.min(
663 taskWaitTime,
664 workerUsage.waitTime?.minimum ?? Infinity
665 )
666 workerUsage.waitTime.maximum = Math.max(
667 taskWaitTime,
668 workerUsage.waitTime?.maximum ?? -Infinity
669 )
670 if (
671 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
672 .waitTime.average &&
673 workerUsage.tasks.executed !== 0
674 ) {
675 workerUsage.waitTime.average =
676 workerUsage.waitTime.aggregate /
677 (workerUsage.tasks.executed - workerUsage.tasks.failed)
678 }
679 if (
680 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
681 .waitTime.median
682 ) {
683 workerUsage.waitTime.history.push(taskWaitTime)
684 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
685 }
686 }
687 }
688
689 private updateEluWorkerUsage (
690 workerUsage: WorkerUsage,
691 message: MessageValue<Response>
692 ): void {
693 if (
694 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
695 .aggregate
696 ) {
697 if (message.taskPerformance?.elu != null) {
698 workerUsage.elu.idle.aggregate =
699 (workerUsage.elu.idle?.aggregate ?? 0) +
700 message.taskPerformance.elu.idle
701 workerUsage.elu.active.aggregate =
702 (workerUsage.elu.active?.aggregate ?? 0) +
703 message.taskPerformance.elu.active
704 if (workerUsage.elu.utilization != null) {
705 workerUsage.elu.utilization =
706 (workerUsage.elu.utilization +
707 message.taskPerformance.elu.utilization) /
708 2
709 } else {
710 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
711 }
712 workerUsage.elu.idle.minimum = Math.min(
713 message.taskPerformance.elu.idle,
714 workerUsage.elu.idle?.minimum ?? Infinity
715 )
716 workerUsage.elu.idle.maximum = Math.max(
717 message.taskPerformance.elu.idle,
718 workerUsage.elu.idle?.maximum ?? -Infinity
719 )
720 workerUsage.elu.active.minimum = Math.min(
721 message.taskPerformance.elu.active,
722 workerUsage.elu.active?.minimum ?? Infinity
723 )
724 workerUsage.elu.active.maximum = Math.max(
725 message.taskPerformance.elu.active,
726 workerUsage.elu.active?.maximum ?? -Infinity
727 )
728 if (
729 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
730 .average &&
731 workerUsage.tasks.executed !== 0
732 ) {
733 const executedTasks =
734 workerUsage.tasks.executed - workerUsage.tasks.failed
735 workerUsage.elu.idle.average =
736 workerUsage.elu.idle.aggregate / executedTasks
737 workerUsage.elu.active.average =
738 workerUsage.elu.active.aggregate / executedTasks
739 }
740 if (
741 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
742 .median
743 ) {
744 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
745 workerUsage.elu.active.history.push(
746 message.taskPerformance.elu.active
747 )
748 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
749 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
750 }
751 }
752 }
753 }
754
755 /**
756 * Chooses a worker node for the next task.
757 *
758 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
759 *
760 * @returns The worker node key
761 */
762 private chooseWorkerNode (): number {
763 if (this.shallCreateDynamicWorker()) {
764 const worker = this.createAndSetupDynamicWorker()
765 if (
766 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
767 ) {
768 return this.getWorkerNodeKey(worker)
769 }
770 }
771 return this.workerChoiceStrategyContext.execute()
772 }
773
774 /**
775 * Conditions for dynamic worker creation.
776 *
777 * @returns Whether to create a dynamic worker or not.
778 */
779 private shallCreateDynamicWorker (): boolean {
780 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
781 }
782
783 /**
784 * Sends a message to the given worker.
785 *
786 * @param worker - The worker which should receive the message.
787 * @param message - The message.
788 */
789 protected abstract sendToWorker (
790 worker: Worker,
791 message: MessageValue<Data>
792 ): void
793
794 /**
795 * Registers a listener callback on the given worker.
796 *
797 * @param worker - The worker which should register a listener.
798 * @param listener - The message listener callback.
799 */
800 private registerWorkerMessageListener<Message extends Data | Response>(
801 worker: Worker,
802 listener: (message: MessageValue<Message>) => void
803 ): void {
804 worker.on('message', listener as MessageHandler<Worker>)
805 }
806
807 /**
808 * Creates a new worker.
809 *
810 * @returns Newly created worker.
811 */
812 protected abstract createWorker (): Worker
813
814 /**
815 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
816 * Can be overridden.
817 *
818 * @param worker - The newly created worker.
819 */
820 protected afterWorkerSetup (worker: Worker): void {
821 // Listen to worker messages.
822 this.registerWorkerMessageListener(worker, this.workerListener())
823 }
824
825 /**
826 * Creates a new worker and sets it up completely in the pool worker nodes.
827 *
828 * @returns New, completely set up worker.
829 */
830 protected createAndSetupWorker (): Worker {
831 const worker = this.createWorker()
832
833 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
834 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
835 worker.on('error', error => {
836 if (this.emitter != null) {
837 this.emitter.emit(PoolEvents.error, error)
838 }
839 if (this.opts.enableTasksQueue === true) {
840 const workerNodeKey = this.getWorkerNodeKey(worker)
841 while (this.tasksQueueSize(workerNodeKey) > 0) {
842 let targetWorkerNodeKey: number = workerNodeKey
843 let minQueuedTasks = Infinity
844 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
845 if (
846 workerNodeId !== workerNodeKey &&
847 workerNode.usage.tasks.queued === 0
848 ) {
849 targetWorkerNodeKey = workerNodeId
850 break
851 }
852 if (
853 workerNodeId !== workerNodeKey &&
854 workerNode.usage.tasks.queued < minQueuedTasks
855 ) {
856 minQueuedTasks = workerNode.usage.tasks.queued
857 targetWorkerNodeKey = workerNodeId
858 }
859 }
860 this.enqueueTask(
861 targetWorkerNodeKey,
862 this.dequeueTask(workerNodeKey) as Task<Data>
863 )
864 }
865 }
866 if (this.opts.restartWorkerOnError === true) {
867 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
868 this.createAndSetupDynamicWorker()
869 } else {
870 this.createAndSetupWorker()
871 }
872 }
873 })
874 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
875 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
876 worker.once('exit', () => {
877 this.removeWorkerNode(worker)
878 })
879
880 this.pushWorkerNode(worker)
881
882 this.setWorkerStatistics(worker)
883
884 this.afterWorkerSetup(worker)
885
886 return worker
887 }
888
889 /**
890 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
891 *
892 * @returns New, completely set up dynamic worker.
893 */
894 protected createAndSetupDynamicWorker (): Worker {
895 const worker = this.createAndSetupWorker()
896 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
897 this.registerWorkerMessageListener(worker, message => {
898 const workerNodeKey = this.getWorkerNodeKey(worker)
899 if (
900 isKillBehavior(KillBehaviors.HARD, message.kill) ||
901 (message.kill != null &&
902 ((this.opts.enableTasksQueue === false &&
903 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
904 (this.opts.enableTasksQueue === true &&
905 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
906 this.tasksQueueSize(workerNodeKey) === 0)))
907 ) {
908 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
909 void (this.destroyWorker(worker) as Promise<void>)
910 }
911 })
912 return worker
913 }
914
915 /**
916 * This function is the listener registered for each worker message.
917 *
918 * @returns The listener function to execute when a message is received from a worker.
919 */
920 protected workerListener (): (message: MessageValue<Response>) => void {
921 return message => {
922 if (message.workerId != null && message.started != null) {
923 // Worker started message received
924 this.handleWorkerStartedMessage(message)
925 } else if (message.id != null) {
926 // Task execution response received
927 this.handleTaskExecutionResponse(message)
928 }
929 }
930 }
931
932 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
933 // Worker started message received
934 const worker = this.getWorkerById(message.workerId as number)
935 if (worker != null) {
936 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
937 message.started as boolean
938 } else {
939 throw new Error(
940 `Worker started message received from unknown worker '${
941 message.workerId as number
942 }'`
943 )
944 }
945 }
946
947 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
948 const promiseResponse = this.promiseResponseMap.get(message.id as string)
949 if (promiseResponse != null) {
950 if (message.taskError != null) {
951 if (this.emitter != null) {
952 this.emitter.emit(PoolEvents.taskError, message.taskError)
953 }
954 promiseResponse.reject(message.taskError.message)
955 } else {
956 promiseResponse.resolve(message.data as Response)
957 }
958 this.afterTaskExecutionHook(promiseResponse.worker, message)
959 this.promiseResponseMap.delete(message.id as string)
960 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
961 if (
962 this.opts.enableTasksQueue === true &&
963 this.tasksQueueSize(workerNodeKey) > 0
964 ) {
965 this.executeTask(
966 workerNodeKey,
967 this.dequeueTask(workerNodeKey) as Task<Data>
968 )
969 }
970 this.workerChoiceStrategyContext.update(workerNodeKey)
971 }
972 }
973
974 private checkAndEmitEvents (): void {
975 if (this.emitter != null) {
976 if (this.busy) {
977 this.emitter.emit(PoolEvents.busy, this.info)
978 }
979 if (this.type === PoolTypes.dynamic && this.full) {
980 this.emitter.emit(PoolEvents.full, this.info)
981 }
982 }
983 }
984
985 /**
986 * Sets the given worker node its tasks usage in the pool.
987 *
988 * @param workerNode - The worker node.
989 * @param workerUsage - The worker usage.
990 */
991 private setWorkerNodeTasksUsage (
992 workerNode: WorkerNode<Worker, Data>,
993 workerUsage: WorkerUsage
994 ): void {
995 workerNode.usage = workerUsage
996 }
997
998 /**
999 * Gets the worker information.
1000 *
1001 * @param workerNodeKey - The worker node key.
1002 */
1003 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1004 return this.workerNodes[workerNodeKey].info
1005 }
1006
1007 /**
1008 * Pushes the given worker in the pool worker nodes.
1009 *
1010 * @param worker - The worker.
1011 * @returns The worker nodes length.
1012 */
1013 private pushWorkerNode (worker: Worker): number {
1014 this.workerNodes.push({
1015 worker,
1016 info: this.getInitialWorkerInfo(worker),
1017 usage: this.getInitialWorkerUsage(),
1018 tasksQueue: new Queue<Task<Data>>()
1019 })
1020 this.setWorkerNodeTasksUsage(
1021 this.workerNodes[this.getWorkerNodeKey(worker)],
1022 this.getInitialWorkerUsage(worker)
1023 )
1024 return this.workerNodes.length
1025 }
1026
1027 /**
1028 * Gets the worker id.
1029 *
1030 * @param worker - The worker.
1031 * @returns The worker id.
1032 */
1033 private getWorkerId (worker: Worker): number | undefined {
1034 if (this.worker === WorkerTypes.thread) {
1035 return worker.threadId
1036 } else if (this.worker === WorkerTypes.cluster) {
1037 return worker.id
1038 }
1039 }
1040
1041 // /**
1042 // * Sets the given worker in the pool worker nodes.
1043 // *
1044 // * @param workerNodeKey - The worker node key.
1045 // * @param worker - The worker.
1046 // * @param workerInfo - The worker info.
1047 // * @param workerUsage - The worker usage.
1048 // * @param tasksQueue - The worker task queue.
1049 // */
1050 // private setWorkerNode (
1051 // workerNodeKey: number,
1052 // worker: Worker,
1053 // workerInfo: WorkerInfo,
1054 // workerUsage: WorkerUsage,
1055 // tasksQueue: Queue<Task<Data>>
1056 // ): void {
1057 // this.workerNodes[workerNodeKey] = {
1058 // worker,
1059 // info: workerInfo,
1060 // usage: workerUsage,
1061 // tasksQueue
1062 // }
1063 // }
1064
1065 /**
1066 * Removes the given worker from the pool worker nodes.
1067 *
1068 * @param worker - The worker.
1069 */
1070 private removeWorkerNode (worker: Worker): void {
1071 const workerNodeKey = this.getWorkerNodeKey(worker)
1072 if (workerNodeKey !== -1) {
1073 this.workerNodes.splice(workerNodeKey, 1)
1074 this.workerChoiceStrategyContext.remove(workerNodeKey)
1075 }
1076 }
1077
1078 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1079 this.beforeTaskExecutionHook(workerNodeKey, task)
1080 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1081 }
1082
1083 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1084 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1085 }
1086
1087 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1088 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1089 }
1090
1091 private tasksQueueSize (workerNodeKey: number): number {
1092 return this.workerNodes[workerNodeKey].tasksQueue.size
1093 }
1094
1095 private tasksMaxQueueSize (workerNodeKey: number): number {
1096 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1097 }
1098
1099 private flushTasksQueue (workerNodeKey: number): void {
1100 while (this.tasksQueueSize(workerNodeKey) > 0) {
1101 this.executeTask(
1102 workerNodeKey,
1103 this.dequeueTask(workerNodeKey) as Task<Data>
1104 )
1105 }
1106 this.workerNodes[workerNodeKey].tasksQueue.clear()
1107 }
1108
1109 private flushTasksQueues (): void {
1110 for (const [workerNodeKey] of this.workerNodes.entries()) {
1111 this.flushTasksQueue(workerNodeKey)
1112 }
1113 }
1114
1115 private setWorkerStatistics (worker: Worker): void {
1116 this.sendToWorker(worker, {
1117 statistics: {
1118 runTime:
1119 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1120 .runTime.aggregate,
1121 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1122 .elu.aggregate
1123 }
1124 })
1125 }
1126
1127 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1128 const getTasksQueueSize = (worker?: Worker): number => {
1129 if (worker == null) {
1130 return 0
1131 }
1132 return this.tasksQueueSize(this.getWorkerNodeKey(worker))
1133 }
1134 const getTasksMaxQueueSize = (worker?: Worker): number => {
1135 if (worker == null) {
1136 return 0
1137 }
1138 return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1139 }
1140 return {
1141 tasks: {
1142 executed: 0,
1143 executing: 0,
1144 get queued (): number {
1145 return getTasksQueueSize(worker)
1146 },
1147 get maxQueued (): number {
1148 return getTasksMaxQueueSize(worker)
1149 },
1150 failed: 0
1151 },
1152 runTime: {
1153 history: new CircularArray()
1154 },
1155 waitTime: {
1156 history: new CircularArray()
1157 },
1158 elu: {
1159 idle: {
1160 history: new CircularArray()
1161 },
1162 active: {
1163 history: new CircularArray()
1164 }
1165 }
1166 }
1167 }
1168
1169 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1170 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1171 }
1172 }