fix: fix internal measurements handling
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerInfo,
32 WorkerNode,
33 WorkerUsage
34 } from './worker'
35 import {
36 Measurements,
37 WorkerChoiceStrategies,
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
40 } from './selection-strategies/selection-strategies-types'
41 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
42 import { version } from './version'
43
44 /**
45 * Base class that implements some shared logic for all poolifier pools.
46 *
47 * @typeParam Worker - Type of worker which manages this pool.
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
50 */
51 export abstract class AbstractPool<
52 Worker extends IWorker,
53 Data = unknown,
54 Response = unknown
55 > implements IPool<Worker, Data, Response> {
56 /** @inheritDoc */
57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
58
59 /** @inheritDoc */
60 public readonly emitter?: PoolEmitter
61
62 /**
63 * The execution response promise map.
64 *
65 * - `key`: The message id of each submitted task.
66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
67 *
68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
69 */
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
74
75 /**
76 * Worker choice strategy context referencing a worker choice algorithm implementation.
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
79 Worker,
80 Data,
81 Response
82 >
83
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
89 /**
90 * Constructs a new poolifier pool.
91 *
92 * @param numberOfWorkers - Number of workers that this pool should manage.
93 * @param filePath - Path to the worker file.
94 * @param opts - Options for the pool.
95 */
96 public constructor (
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
100 ) {
101 if (!this.isMain()) {
102 throw new Error('Cannot start a pool from a worker!')
103 }
104 this.checkNumberOfWorkers(this.numberOfWorkers)
105 this.checkFilePath(this.filePath)
106 this.checkPoolOptions(this.opts)
107
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
112
113 if (this.opts.enableEvents === true) {
114 this.emitter = new PoolEmitter()
115 }
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
125
126 this.setupHook()
127
128 while (this.workerNodes.length < this.numberOfWorkers) {
129 this.createAndSetupWorker()
130 }
131
132 this.startTimestamp = performance.now()
133 }
134
135 private checkFilePath (filePath: string): void {
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
150 throw new TypeError(
151 'Cannot instantiate a pool with a non safe integer number of workers'
152 )
153 } else if (numberOfWorkers < 0) {
154 throw new RangeError(
155 'Cannot instantiate a pool with a negative number of workers'
156 )
157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
186 }
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
193 throw new Error(
194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
195 )
196 }
197 }
198
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
207 if (
208 workerChoiceStrategyOptions.weights != null &&
209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
225 }
226
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
245 throw new Error(
246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
247 )
248 }
249 }
250
251 /** @inheritDoc */
252 public get info (): PoolInfo {
253 return {
254 version,
255 type: this.type,
256 worker: this.worker,
257 minSize: this.minSize,
258 maxSize: this.maxSize,
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 workerNode.usage.tasks.executing === 0
267 ? accumulator + 1
268 : accumulator,
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
274 0
275 ),
276 executedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 accumulator + workerNode.usage.tasks.executed,
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.usage.tasks.executing,
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 accumulator + workerNode.usage.tasks.queued,
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 accumulator + workerNode.usage.tasks.maxQueued,
294 0
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
298 accumulator + workerNode.usage.tasks.failed,
299 0
300 ),
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate && {
303 runTime: {
304 minimum: Math.min(
305 ...this.workerNodes.map(
306 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
307 )
308 ),
309 maximum: Math.max(
310 ...this.workerNodes.map(
311 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
312 )
313 )
314 }
315 }),
316 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
317 .waitTime.aggregate && {
318 waitTime: {
319 minimum: Math.min(
320 ...this.workerNodes.map(
321 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
322 )
323 ),
324 maximum: Math.max(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
327 )
328 )
329 }
330 })
331 }
332 }
333
334 /**
335 * Gets the approximate pool utilization.
336 *
337 * @returns The pool utilization.
338 */
339 private get utilization (): number {
340 const poolRunTimeCapacity =
341 (performance.now() - this.startTimestamp) * this.maxSize
342 const totalTasksRunTime = this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
345 0
346 )
347 const totalTasksWaitTime = this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
350 0
351 )
352 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
353 }
354
355 /**
356 * Pool type.
357 *
358 * If it is `'dynamic'`, it provides the `max` property.
359 */
360 protected abstract get type (): PoolType
361
362 /**
363 * Gets the worker type.
364 */
365 protected abstract get worker (): WorkerType
366
367 /**
368 * Pool minimum size.
369 */
370 protected abstract get minSize (): number
371
372 /**
373 * Pool maximum size.
374 */
375 protected abstract get maxSize (): number
376
377 /**
378 * Get the worker given its id.
379 *
380 * @param workerId - The worker id.
381 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
382 */
383 private getWorkerById (workerId: number): Worker | undefined {
384 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
385 ?.worker
386 }
387
388 /**
389 * Gets the given worker its worker node key.
390 *
391 * @param worker - The worker.
392 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
393 */
394 private getWorkerNodeKey (worker: Worker): number {
395 return this.workerNodes.findIndex(
396 workerNode => workerNode.worker === worker
397 )
398 }
399
400 /** @inheritDoc */
401 public setWorkerChoiceStrategy (
402 workerChoiceStrategy: WorkerChoiceStrategy,
403 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
404 ): void {
405 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
406 this.opts.workerChoiceStrategy = workerChoiceStrategy
407 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
408 this.opts.workerChoiceStrategy
409 )
410 if (workerChoiceStrategyOptions != null) {
411 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
412 }
413 for (const workerNode of this.workerNodes) {
414 this.setWorkerNodeTasksUsage(
415 workerNode,
416 this.getInitialWorkerUsage(workerNode.worker)
417 )
418 this.setWorkerStatistics(workerNode.worker)
419 }
420 }
421
422 /** @inheritDoc */
423 public setWorkerChoiceStrategyOptions (
424 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
425 ): void {
426 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
427 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
428 this.workerChoiceStrategyContext.setOptions(
429 this.opts.workerChoiceStrategyOptions
430 )
431 }
432
433 /** @inheritDoc */
434 public enableTasksQueue (
435 enable: boolean,
436 tasksQueueOptions?: TasksQueueOptions
437 ): void {
438 if (this.opts.enableTasksQueue === true && !enable) {
439 this.flushTasksQueues()
440 }
441 this.opts.enableTasksQueue = enable
442 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
443 }
444
445 /** @inheritDoc */
446 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
447 if (this.opts.enableTasksQueue === true) {
448 this.checkValidTasksQueueOptions(tasksQueueOptions)
449 this.opts.tasksQueueOptions =
450 this.buildTasksQueueOptions(tasksQueueOptions)
451 } else if (this.opts.tasksQueueOptions != null) {
452 delete this.opts.tasksQueueOptions
453 }
454 }
455
456 private buildTasksQueueOptions (
457 tasksQueueOptions: TasksQueueOptions
458 ): TasksQueueOptions {
459 return {
460 concurrency: tasksQueueOptions?.concurrency ?? 1
461 }
462 }
463
464 /**
465 * Whether the pool is full or not.
466 *
467 * The pool filling boolean status.
468 */
469 protected get full (): boolean {
470 return this.workerNodes.length >= this.maxSize
471 }
472
473 /**
474 * Whether the pool is busy or not.
475 *
476 * The pool busyness boolean status.
477 */
478 protected abstract get busy (): boolean
479
480 /**
481 * Whether worker nodes are executing at least one task.
482 *
483 * @returns Worker nodes busyness boolean status.
484 */
485 protected internalBusy (): boolean {
486 return (
487 this.workerNodes.findIndex(workerNode => {
488 return workerNode.usage.tasks.executing === 0
489 }) === -1
490 )
491 }
492
493 /** @inheritDoc */
494 public async execute (data?: Data, name?: string): Promise<Response> {
495 const timestamp = performance.now()
496 const workerNodeKey = this.chooseWorkerNode()
497 const submittedTask: Task<Data> = {
498 name,
499 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
500 data: data ?? ({} as Data),
501 timestamp,
502 id: randomUUID()
503 }
504 const res = new Promise<Response>((resolve, reject) => {
505 this.promiseResponseMap.set(submittedTask.id as string, {
506 resolve,
507 reject,
508 worker: this.workerNodes[workerNodeKey].worker
509 })
510 })
511 if (
512 this.opts.enableTasksQueue === true &&
513 (this.busy ||
514 this.workerNodes[workerNodeKey].usage.tasks.executing >=
515 ((this.opts.tasksQueueOptions as TasksQueueOptions)
516 .concurrency as number))
517 ) {
518 this.enqueueTask(workerNodeKey, submittedTask)
519 } else {
520 this.executeTask(workerNodeKey, submittedTask)
521 }
522 this.checkAndEmitEvents()
523 // eslint-disable-next-line @typescript-eslint/return-await
524 return res
525 }
526
527 /** @inheritDoc */
528 public async destroy (): Promise<void> {
529 await Promise.all(
530 this.workerNodes.map(async (workerNode, workerNodeKey) => {
531 this.flushTasksQueue(workerNodeKey)
532 // FIXME: wait for tasks to be finished
533 const workerExitPromise = new Promise<void>(resolve => {
534 workerNode.worker.on('exit', () => {
535 resolve()
536 })
537 })
538 await this.destroyWorker(workerNode.worker)
539 await workerExitPromise
540 })
541 )
542 }
543
544 /**
545 * Terminates the given worker.
546 *
547 * @param worker - A worker within `workerNodes`.
548 */
549 protected abstract destroyWorker (worker: Worker): void | Promise<void>
550
551 /**
552 * Setup hook to execute code before worker nodes are created in the abstract constructor.
553 * Can be overridden.
554 *
555 * @virtual
556 */
557 protected setupHook (): void {
558 // Intentionally empty
559 }
560
561 /**
562 * Should return whether the worker is the main worker or not.
563 */
564 protected abstract isMain (): boolean
565
566 /**
567 * Hook executed before the worker task execution.
568 * Can be overridden.
569 *
570 * @param workerNodeKey - The worker node key.
571 * @param task - The task to execute.
572 */
573 protected beforeTaskExecutionHook (
574 workerNodeKey: number,
575 task: Task<Data>
576 ): void {
577 const workerUsage = this.workerNodes[workerNodeKey].usage
578 ++workerUsage.tasks.executing
579 this.updateWaitTimeWorkerUsage(workerUsage, task)
580 }
581
582 /**
583 * Hook executed after the worker task execution.
584 * Can be overridden.
585 *
586 * @param worker - The worker.
587 * @param message - The received message.
588 */
589 protected afterTaskExecutionHook (
590 worker: Worker,
591 message: MessageValue<Response>
592 ): void {
593 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
594 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
595 this.updateRunTimeWorkerUsage(workerUsage, message)
596 this.updateEluWorkerUsage(workerUsage, message)
597 }
598
599 private updateTaskStatisticsWorkerUsage (
600 workerUsage: WorkerUsage,
601 message: MessageValue<Response>
602 ): void {
603 const workerTaskStatistics = workerUsage.tasks
604 --workerTaskStatistics.executing
605 ++workerTaskStatistics.executed
606 if (message.taskError != null) {
607 ++workerTaskStatistics.failed
608 }
609 }
610
611 private updateRunTimeWorkerUsage (
612 workerUsage: WorkerUsage,
613 message: MessageValue<Response>
614 ): void {
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
617 .aggregate
618 ) {
619 const taskRunTime = message.taskPerformance?.runTime ?? 0
620 workerUsage.runTime.aggregate =
621 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
622 workerUsage.runTime.minimum = Math.min(
623 taskRunTime,
624 workerUsage.runTime?.minimum ?? Infinity
625 )
626 workerUsage.runTime.maximum = Math.max(
627 taskRunTime,
628 workerUsage.runTime?.maximum ?? -Infinity
629 )
630 if (
631 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
632 .average &&
633 workerUsage.tasks.executed !== 0
634 ) {
635 workerUsage.runTime.average =
636 workerUsage.runTime.aggregate /
637 (workerUsage.tasks.executed - workerUsage.tasks.failed)
638 }
639 if (
640 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
641 .median &&
642 message.taskPerformance?.runTime != null
643 ) {
644 workerUsage.runTime.history.push(message.taskPerformance.runTime)
645 workerUsage.runTime.median = median(workerUsage.runTime.history)
646 }
647 }
648 }
649
650 private updateWaitTimeWorkerUsage (
651 workerUsage: WorkerUsage,
652 task: Task<Data>
653 ): void {
654 const timestamp = performance.now()
655 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
656 if (
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
658 .aggregate
659 ) {
660 workerUsage.waitTime.aggregate =
661 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
662 workerUsage.waitTime.minimum = Math.min(
663 taskWaitTime,
664 workerUsage.waitTime?.minimum ?? Infinity
665 )
666 workerUsage.waitTime.maximum = Math.max(
667 taskWaitTime,
668 workerUsage.waitTime?.maximum ?? -Infinity
669 )
670 if (
671 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
672 .waitTime.average &&
673 workerUsage.tasks.executed !== 0
674 ) {
675 workerUsage.waitTime.average =
676 workerUsage.waitTime.aggregate /
677 (workerUsage.tasks.executed - workerUsage.tasks.failed)
678 }
679 if (
680 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
681 .waitTime.median
682 ) {
683 workerUsage.waitTime.history.push(taskWaitTime)
684 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
685 }
686 }
687 }
688
689 private updateEluWorkerUsage (
690 workerUsage: WorkerUsage,
691 message: MessageValue<Response>
692 ): void {
693 if (
694 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
695 .aggregate
696 ) {
697 if (message.taskPerformance?.elu != null) {
698 workerUsage.elu.idle.aggregate =
699 (workerUsage.elu.idle?.aggregate ?? 0) +
700 message.taskPerformance.elu.idle
701 workerUsage.elu.active.aggregate =
702 (workerUsage.elu.active?.aggregate ?? 0) +
703 message.taskPerformance.elu.active
704 if (workerUsage.elu.utilization != null) {
705 workerUsage.elu.utilization =
706 (workerUsage.elu.utilization +
707 message.taskPerformance.elu.utilization) /
708 2
709 } else {
710 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
711 }
712 workerUsage.elu.idle.minimum = Math.min(
713 message.taskPerformance.elu.idle,
714 workerUsage.elu.idle?.minimum ?? Infinity
715 )
716 workerUsage.elu.idle.maximum = Math.max(
717 message.taskPerformance.elu.idle,
718 workerUsage.elu.idle?.maximum ?? -Infinity
719 )
720 workerUsage.elu.active.minimum = Math.min(
721 message.taskPerformance.elu.active,
722 workerUsage.elu.active?.minimum ?? Infinity
723 )
724 workerUsage.elu.active.maximum = Math.max(
725 message.taskPerformance.elu.active,
726 workerUsage.elu.active?.maximum ?? -Infinity
727 )
728 if (
729 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
730 .average &&
731 workerUsage.tasks.executed !== 0
732 ) {
733 const executedTasks =
734 workerUsage.tasks.executed - workerUsage.tasks.failed
735 workerUsage.elu.idle.average =
736 workerUsage.elu.idle.aggregate / executedTasks
737 workerUsage.elu.active.average =
738 workerUsage.elu.active.aggregate / executedTasks
739 }
740 if (
741 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
742 .median &&
743 message.taskPerformance?.elu != null
744 ) {
745 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
746 workerUsage.elu.active.history.push(
747 message.taskPerformance.elu.active
748 )
749 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
750 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
751 }
752 }
753 }
754 }
755
756 /**
757 * Chooses a worker node for the next task.
758 *
759 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
760 *
761 * @returns The worker node key
762 */
763 private chooseWorkerNode (): number {
764 if (this.shallCreateDynamicWorker()) {
765 const worker = this.createAndSetupDynamicWorker()
766 if (
767 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
768 ) {
769 return this.getWorkerNodeKey(worker)
770 }
771 }
772 return this.workerChoiceStrategyContext.execute()
773 }
774
775 /**
776 * Conditions for dynamic worker creation.
777 *
778 * @returns Whether to create a dynamic worker or not.
779 */
780 private shallCreateDynamicWorker (): boolean {
781 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
782 }
783
784 /**
785 * Sends a message to the given worker.
786 *
787 * @param worker - The worker which should receive the message.
788 * @param message - The message.
789 */
790 protected abstract sendToWorker (
791 worker: Worker,
792 message: MessageValue<Data>
793 ): void
794
795 /**
796 * Registers a listener callback on the given worker.
797 *
798 * @param worker - The worker which should register a listener.
799 * @param listener - The message listener callback.
800 */
801 private registerWorkerMessageListener<Message extends Data | Response>(
802 worker: Worker,
803 listener: (message: MessageValue<Message>) => void
804 ): void {
805 worker.on('message', listener as MessageHandler<Worker>)
806 }
807
808 /**
809 * Creates a new worker.
810 *
811 * @returns Newly created worker.
812 */
813 protected abstract createWorker (): Worker
814
815 /**
816 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
817 * Can be overridden.
818 *
819 * @param worker - The newly created worker.
820 */
821 protected afterWorkerSetup (worker: Worker): void {
822 // Listen to worker messages.
823 this.registerWorkerMessageListener(worker, this.workerListener())
824 }
825
826 /**
827 * Creates a new worker and sets it up completely in the pool worker nodes.
828 *
829 * @returns New, completely set up worker.
830 */
831 protected createAndSetupWorker (): Worker {
832 const worker = this.createWorker()
833
834 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
835 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
836 worker.on('error', error => {
837 if (this.emitter != null) {
838 this.emitter.emit(PoolEvents.error, error)
839 }
840 if (this.opts.enableTasksQueue === true) {
841 const workerNodeKey = this.getWorkerNodeKey(worker)
842 while (this.tasksQueueSize(workerNodeKey) > 0) {
843 let targetWorkerNodeKey: number = workerNodeKey
844 let minQueuedTasks = Infinity
845 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
846 if (
847 workerNodeId !== workerNodeKey &&
848 workerNode.usage.tasks.queued === 0
849 ) {
850 targetWorkerNodeKey = workerNodeId
851 break
852 }
853 if (
854 workerNodeId !== workerNodeKey &&
855 workerNode.usage.tasks.queued < minQueuedTasks
856 ) {
857 minQueuedTasks = workerNode.usage.tasks.queued
858 targetWorkerNodeKey = workerNodeId
859 }
860 }
861 this.enqueueTask(
862 targetWorkerNodeKey,
863 this.dequeueTask(workerNodeKey) as Task<Data>
864 )
865 }
866 }
867 if (this.opts.restartWorkerOnError === true) {
868 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
873 }
874 })
875 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
876 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
877 worker.once('exit', () => {
878 this.removeWorkerNode(worker)
879 })
880
881 this.pushWorkerNode(worker)
882
883 this.setWorkerStatistics(worker)
884
885 this.afterWorkerSetup(worker)
886
887 return worker
888 }
889
890 /**
891 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
892 *
893 * @returns New, completely set up dynamic worker.
894 */
895 protected createAndSetupDynamicWorker (): Worker {
896 const worker = this.createAndSetupWorker()
897 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
898 this.registerWorkerMessageListener(worker, message => {
899 const workerNodeKey = this.getWorkerNodeKey(worker)
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
905 (this.opts.enableTasksQueue === true &&
906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
907 this.tasksQueueSize(workerNodeKey) === 0)))
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
913 return worker
914 }
915
916 /**
917 * This function is the listener registered for each worker message.
918 *
919 * @returns The listener function to execute when a message is received from a worker.
920 */
921 protected workerListener (): (message: MessageValue<Response>) => void {
922 return message => {
923 if (message.workerId != null && message.started != null) {
924 // Worker started message received
925 this.handleWorkerStartedMessage(message)
926 } else if (message.id != null) {
927 // Task execution response received
928 this.handleTaskExecutionResponse(message)
929 }
930 }
931 }
932
933 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
934 // Worker started message received
935 const worker = this.getWorkerById(message.workerId as number)
936 if (worker != null) {
937 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
938 message.started as boolean
939 } else {
940 throw new Error(
941 `Worker started message received from unknown worker '${
942 message.workerId as number
943 }'`
944 )
945 }
946 }
947
948 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
949 const promiseResponse = this.promiseResponseMap.get(message.id as string)
950 if (promiseResponse != null) {
951 if (message.taskError != null) {
952 if (this.emitter != null) {
953 this.emitter.emit(PoolEvents.taskError, message.taskError)
954 }
955 promiseResponse.reject(message.taskError.message)
956 } else {
957 promiseResponse.resolve(message.data as Response)
958 }
959 this.afterTaskExecutionHook(promiseResponse.worker, message)
960 this.promiseResponseMap.delete(message.id as string)
961 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
962 if (
963 this.opts.enableTasksQueue === true &&
964 this.tasksQueueSize(workerNodeKey) > 0
965 ) {
966 this.executeTask(
967 workerNodeKey,
968 this.dequeueTask(workerNodeKey) as Task<Data>
969 )
970 }
971 this.workerChoiceStrategyContext.update(workerNodeKey)
972 }
973 }
974
975 private checkAndEmitEvents (): void {
976 if (this.emitter != null) {
977 if (this.busy) {
978 this.emitter.emit(PoolEvents.busy, this.info)
979 }
980 if (this.type === PoolTypes.dynamic && this.full) {
981 this.emitter.emit(PoolEvents.full, this.info)
982 }
983 }
984 }
985
986 /**
987 * Sets the given worker node its tasks usage in the pool.
988 *
989 * @param workerNode - The worker node.
990 * @param workerUsage - The worker usage.
991 */
992 private setWorkerNodeTasksUsage (
993 workerNode: WorkerNode<Worker, Data>,
994 workerUsage: WorkerUsage
995 ): void {
996 workerNode.usage = workerUsage
997 }
998
999 /**
1000 * Gets the worker information.
1001 *
1002 * @param workerNodeKey - The worker node key.
1003 */
1004 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1005 return this.workerNodes[workerNodeKey].info
1006 }
1007
1008 /**
1009 * Pushes the given worker in the pool worker nodes.
1010 *
1011 * @param worker - The worker.
1012 * @returns The worker nodes length.
1013 */
1014 private pushWorkerNode (worker: Worker): number {
1015 this.workerNodes.push({
1016 worker,
1017 info: this.getInitialWorkerInfo(worker),
1018 usage: this.getInitialWorkerUsage(),
1019 tasksQueue: new Queue<Task<Data>>()
1020 })
1021 this.setWorkerNodeTasksUsage(
1022 this.workerNodes[this.getWorkerNodeKey(worker)],
1023 this.getInitialWorkerUsage(worker)
1024 )
1025 return this.workerNodes.length
1026 }
1027
1028 /**
1029 * Gets the worker id.
1030 *
1031 * @param worker - The worker.
1032 * @returns The worker id.
1033 */
1034 private getWorkerId (worker: Worker): number | undefined {
1035 if (this.worker === WorkerTypes.thread) {
1036 return worker.threadId
1037 } else if (this.worker === WorkerTypes.cluster) {
1038 return worker.id
1039 }
1040 }
1041
1042 // /**
1043 // * Sets the given worker in the pool worker nodes.
1044 // *
1045 // * @param workerNodeKey - The worker node key.
1046 // * @param worker - The worker.
1047 // * @param workerInfo - The worker info.
1048 // * @param workerUsage - The worker usage.
1049 // * @param tasksQueue - The worker task queue.
1050 // */
1051 // private setWorkerNode (
1052 // workerNodeKey: number,
1053 // worker: Worker,
1054 // workerInfo: WorkerInfo,
1055 // workerUsage: WorkerUsage,
1056 // tasksQueue: Queue<Task<Data>>
1057 // ): void {
1058 // this.workerNodes[workerNodeKey] = {
1059 // worker,
1060 // info: workerInfo,
1061 // usage: workerUsage,
1062 // tasksQueue
1063 // }
1064 // }
1065
1066 /**
1067 * Removes the given worker from the pool worker nodes.
1068 *
1069 * @param worker - The worker.
1070 */
1071 private removeWorkerNode (worker: Worker): void {
1072 const workerNodeKey = this.getWorkerNodeKey(worker)
1073 if (workerNodeKey !== -1) {
1074 this.workerNodes.splice(workerNodeKey, 1)
1075 this.workerChoiceStrategyContext.remove(workerNodeKey)
1076 }
1077 }
1078
1079 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1080 this.beforeTaskExecutionHook(workerNodeKey, task)
1081 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1082 }
1083
1084 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1085 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1086 }
1087
1088 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1089 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1090 }
1091
1092 private tasksQueueSize (workerNodeKey: number): number {
1093 return this.workerNodes[workerNodeKey].tasksQueue.size
1094 }
1095
1096 private tasksMaxQueueSize (workerNodeKey: number): number {
1097 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1098 }
1099
1100 private flushTasksQueue (workerNodeKey: number): void {
1101 while (this.tasksQueueSize(workerNodeKey) > 0) {
1102 this.executeTask(
1103 workerNodeKey,
1104 this.dequeueTask(workerNodeKey) as Task<Data>
1105 )
1106 }
1107 this.workerNodes[workerNodeKey].tasksQueue.clear()
1108 }
1109
1110 private flushTasksQueues (): void {
1111 for (const [workerNodeKey] of this.workerNodes.entries()) {
1112 this.flushTasksQueue(workerNodeKey)
1113 }
1114 }
1115
1116 private setWorkerStatistics (worker: Worker): void {
1117 this.sendToWorker(worker, {
1118 statistics: {
1119 runTime:
1120 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1121 .runTime.aggregate,
1122 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1123 .elu.aggregate
1124 }
1125 })
1126 }
1127
1128 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1129 const getTasksQueueSize = (worker?: Worker): number => {
1130 if (worker == null) {
1131 return 0
1132 }
1133 // FIXME: Workaround tasks queue initialization issue.
1134 try {
1135 return this.tasksQueueSize(this.getWorkerNodeKey(worker))
1136 } catch {
1137 return 0
1138 }
1139 }
1140 const getTasksMaxQueueSize = (worker?: Worker): number => {
1141 if (worker == null) {
1142 return 0
1143 }
1144 // FIXME: Workaround tasks queue initialization issue.
1145 try {
1146 return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1147 } catch {
1148 return 0
1149 }
1150 }
1151 return {
1152 tasks: {
1153 executed: 0,
1154 executing: 0,
1155 get queued (): number {
1156 return getTasksQueueSize(worker)
1157 },
1158 get maxQueued (): number {
1159 return getTasksMaxQueueSize(worker)
1160 },
1161 failed: 0
1162 },
1163 runTime: {
1164 history: new CircularArray()
1165 },
1166 waitTime: {
1167 history: new CircularArray()
1168 },
1169 elu: {
1170 idle: {
1171 history: new CircularArray()
1172 },
1173 active: {
1174 history: new CircularArray()
1175 }
1176 }
1177 }
1178 }
1179
1180 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1181 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1182 }
1183 }