feat: add pool and worker readyness tracking infrastructure
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions
22 } from './pool'
23 import type {
24 IWorker,
25 IWorkerNode,
26 MessageHandler,
27 Task,
28 WorkerInfo,
29 WorkerType,
30 WorkerUsage
31 } from './worker'
32 import {
33 Measurements,
34 WorkerChoiceStrategies,
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
37 } from './selection-strategies/selection-strategies-types'
38 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
39 import { version } from './version'
40 import { WorkerNode } from './worker-node'
41
42 /**
43 * Base class that implements some shared logic for all poolifier pools.
44 *
45 * @typeParam Worker - Type of worker which manages this pool.
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 */
49 export abstract class AbstractPool<
50 Worker extends IWorker,
51 Data = unknown,
52 Response = unknown
53 > implements IPool<Worker, Data, Response> {
54 /** @inheritDoc */
55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
56
57 /** @inheritDoc */
58 public readonly emitter?: PoolEmitter
59
60 /**
61 * The execution response promise map.
62 *
63 * - `key`: The message id of each submitted task.
64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 *
66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 */
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
72
73 /**
74 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
77 Worker,
78 Data,
79 Response
80 >
81
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
87 /**
88 * Constructs a new poolifier pool.
89 *
90 * @param numberOfWorkers - Number of workers that this pool should manage.
91 * @param filePath - Path to the worker file.
92 * @param opts - Options for the pool.
93 */
94 public constructor (
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
98 ) {
99 if (!this.isMain()) {
100 throw new Error('Cannot start a pool from a worker!')
101 }
102 this.checkNumberOfWorkers(this.numberOfWorkers)
103 this.checkFilePath(this.filePath)
104 this.checkPoolOptions(this.opts)
105
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
110
111 if (this.opts.enableEvents === true) {
112 this.emitter = new PoolEmitter()
113 }
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
123
124 this.setupHook()
125
126 while (this.workerNodes.length < this.numberOfWorkers) {
127 this.createAndSetupWorker()
128 }
129
130 this.startTimestamp = performance.now()
131 }
132
133 private checkFilePath (filePath: string): void {
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
148 throw new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 } else if (numberOfWorkers < 0) {
152 throw new RangeError(
153 'Cannot instantiate a pool with a negative number of workers'
154 )
155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
161 if (this.type === PoolTypes.dynamic && min > max) {
162 throw new RangeError(
163 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 )
165 } else if (this.type === PoolTypes.dynamic && min === max) {
166 throw new RangeError(
167 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
168 )
169 }
170 }
171
172 private checkPoolOptions (opts: PoolOptions<Worker>): void {
173 if (isPlainObject(opts)) {
174 this.opts.workerChoiceStrategy =
175 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
176 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
177 this.opts.workerChoiceStrategyOptions =
178 opts.workerChoiceStrategyOptions ??
179 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
180 this.checkValidWorkerChoiceStrategyOptions(
181 this.opts.workerChoiceStrategyOptions
182 )
183 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
184 this.opts.enableEvents = opts.enableEvents ?? true
185 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
186 if (this.opts.enableTasksQueue) {
187 this.checkValidTasksQueueOptions(
188 opts.tasksQueueOptions as TasksQueueOptions
189 )
190 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
191 opts.tasksQueueOptions as TasksQueueOptions
192 )
193 }
194 } else {
195 throw new TypeError('Invalid pool options: must be a plain object')
196 }
197 }
198
199 private checkValidWorkerChoiceStrategy (
200 workerChoiceStrategy: WorkerChoiceStrategy
201 ): void {
202 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
203 throw new Error(
204 `Invalid worker choice strategy '${workerChoiceStrategy}'`
205 )
206 }
207 }
208
209 private checkValidWorkerChoiceStrategyOptions (
210 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
211 ): void {
212 if (!isPlainObject(workerChoiceStrategyOptions)) {
213 throw new TypeError(
214 'Invalid worker choice strategy options: must be a plain object'
215 )
216 }
217 if (
218 workerChoiceStrategyOptions.weights != null &&
219 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
220 ) {
221 throw new Error(
222 'Invalid worker choice strategy options: must have a weight for each worker node'
223 )
224 }
225 if (
226 workerChoiceStrategyOptions.measurement != null &&
227 !Object.values(Measurements).includes(
228 workerChoiceStrategyOptions.measurement
229 )
230 ) {
231 throw new Error(
232 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
233 )
234 }
235 }
236
237 private checkValidTasksQueueOptions (
238 tasksQueueOptions: TasksQueueOptions
239 ): void {
240 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
241 throw new TypeError('Invalid tasks queue options: must be a plain object')
242 }
243 if (
244 tasksQueueOptions?.concurrency != null &&
245 !Number.isSafeInteger(tasksQueueOptions.concurrency)
246 ) {
247 throw new TypeError(
248 'Invalid worker tasks concurrency: must be an integer'
249 )
250 }
251 if (
252 tasksQueueOptions?.concurrency != null &&
253 tasksQueueOptions.concurrency <= 0
254 ) {
255 throw new Error(
256 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
257 )
258 }
259 }
260
261 /** @inheritDoc */
262 public get info (): PoolInfo {
263 return {
264 version,
265 type: this.type,
266 worker: this.worker,
267 ready: this.ready,
268 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
269 minSize: this.minSize,
270 maxSize: this.maxSize,
271 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
272 .runTime.aggregate &&
273 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
274 .waitTime.aggregate && { utilization: round(this.utilization) }),
275 workerNodes: this.workerNodes.length,
276 idleWorkerNodes: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 workerNode.usage.tasks.executing === 0
279 ? accumulator + 1
280 : accumulator,
281 0
282 ),
283 busyWorkerNodes: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
286 0
287 ),
288 executedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
290 accumulator + workerNode.usage.tasks.executed,
291 0
292 ),
293 executingTasks: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 accumulator + workerNode.usage.tasks.executing,
296 0
297 ),
298 queuedTasks: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
300 accumulator + workerNode.usage.tasks.queued,
301 0
302 ),
303 maxQueuedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.maxQueued,
306 0
307 ),
308 failedTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 accumulator + workerNode.usage.tasks.failed,
311 0
312 ),
313 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .runTime.aggregate && {
315 runTime: {
316 minimum: round(
317 Math.min(
318 ...this.workerNodes.map(
319 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
320 )
321 )
322 ),
323 maximum: round(
324 Math.max(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
327 )
328 )
329 ),
330 average: round(
331 this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
334 0
335 ) /
336 this.workerNodes.reduce(
337 (accumulator, workerNode) =>
338 accumulator + (workerNode.usage.tasks?.executed ?? 0),
339 0
340 )
341 ),
342 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
343 .runTime.median && {
344 median: round(
345 median(
346 this.workerNodes.map(
347 workerNode => workerNode.usage.runTime?.median ?? 0
348 )
349 )
350 )
351 })
352 }
353 }),
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .waitTime.aggregate && {
356 waitTime: {
357 minimum: round(
358 Math.min(
359 ...this.workerNodes.map(
360 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
361 )
362 )
363 ),
364 maximum: round(
365 Math.max(
366 ...this.workerNodes.map(
367 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
368 )
369 )
370 ),
371 average: round(
372 this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
375 0
376 ) /
377 this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + (workerNode.usage.tasks?.executed ?? 0),
380 0
381 )
382 ),
383 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
384 .waitTime.median && {
385 median: round(
386 median(
387 this.workerNodes.map(
388 workerNode => workerNode.usage.waitTime?.median ?? 0
389 )
390 )
391 )
392 })
393 }
394 })
395 }
396 }
397
398 private get starting (): boolean {
399 return (
400 !this.full ||
401 (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
402 )
403 }
404
405 private get ready (): boolean {
406 return (
407 this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
408 )
409 }
410
411 /**
412 * Gets the approximate pool utilization.
413 *
414 * @returns The pool utilization.
415 */
416 private get utilization (): number {
417 const poolRunTimeCapacity =
418 (performance.now() - this.startTimestamp) * this.maxSize
419 const totalTasksRunTime = this.workerNodes.reduce(
420 (accumulator, workerNode) =>
421 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
422 0
423 )
424 const totalTasksWaitTime = this.workerNodes.reduce(
425 (accumulator, workerNode) =>
426 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
427 0
428 )
429 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
430 }
431
432 /**
433 * Pool type.
434 *
435 * If it is `'dynamic'`, it provides the `max` property.
436 */
437 protected abstract get type (): PoolType
438
439 /**
440 * Gets the worker type.
441 */
442 protected abstract get worker (): WorkerType
443
444 /**
445 * Pool minimum size.
446 */
447 protected abstract get minSize (): number
448
449 /**
450 * Pool maximum size.
451 */
452 protected abstract get maxSize (): number
453
454 /**
455 * Get the worker given its id.
456 *
457 * @param workerId - The worker id.
458 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
459 */
460 private getWorkerById (workerId: number): Worker | undefined {
461 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
462 ?.worker
463 }
464
465 /**
466 * Gets the given worker its worker node key.
467 *
468 * @param worker - The worker.
469 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
470 */
471 private getWorkerNodeKey (worker: Worker): number {
472 return this.workerNodes.findIndex(
473 workerNode => workerNode.worker === worker
474 )
475 }
476
477 /** @inheritDoc */
478 public setWorkerChoiceStrategy (
479 workerChoiceStrategy: WorkerChoiceStrategy,
480 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
481 ): void {
482 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
483 this.opts.workerChoiceStrategy = workerChoiceStrategy
484 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
485 this.opts.workerChoiceStrategy
486 )
487 if (workerChoiceStrategyOptions != null) {
488 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
489 }
490 for (const workerNode of this.workerNodes) {
491 workerNode.resetUsage()
492 this.setWorkerStatistics(workerNode.worker)
493 }
494 }
495
496 /** @inheritDoc */
497 public setWorkerChoiceStrategyOptions (
498 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
499 ): void {
500 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
501 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
502 this.workerChoiceStrategyContext.setOptions(
503 this.opts.workerChoiceStrategyOptions
504 )
505 }
506
507 /** @inheritDoc */
508 public enableTasksQueue (
509 enable: boolean,
510 tasksQueueOptions?: TasksQueueOptions
511 ): void {
512 if (this.opts.enableTasksQueue === true && !enable) {
513 this.flushTasksQueues()
514 }
515 this.opts.enableTasksQueue = enable
516 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
517 }
518
519 /** @inheritDoc */
520 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
521 if (this.opts.enableTasksQueue === true) {
522 this.checkValidTasksQueueOptions(tasksQueueOptions)
523 this.opts.tasksQueueOptions =
524 this.buildTasksQueueOptions(tasksQueueOptions)
525 } else if (this.opts.tasksQueueOptions != null) {
526 delete this.opts.tasksQueueOptions
527 }
528 }
529
530 private buildTasksQueueOptions (
531 tasksQueueOptions: TasksQueueOptions
532 ): TasksQueueOptions {
533 return {
534 concurrency: tasksQueueOptions?.concurrency ?? 1
535 }
536 }
537
538 /**
539 * Whether the pool is full or not.
540 *
541 * The pool filling boolean status.
542 */
543 protected get full (): boolean {
544 return this.workerNodes.length >= this.maxSize
545 }
546
547 /**
548 * Whether the pool is busy or not.
549 *
550 * The pool busyness boolean status.
551 */
552 protected abstract get busy (): boolean
553
554 /**
555 * Whether worker nodes are executing at least one task.
556 *
557 * @returns Worker nodes busyness boolean status.
558 */
559 protected internalBusy (): boolean {
560 return (
561 this.workerNodes.findIndex(workerNode => {
562 return workerNode.usage.tasks.executing === 0
563 }) === -1
564 )
565 }
566
567 /** @inheritDoc */
568 public async execute (data?: Data, name?: string): Promise<Response> {
569 const timestamp = performance.now()
570 const workerNodeKey = this.chooseWorkerNode()
571 const submittedTask: Task<Data> = {
572 name,
573 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
574 data: data ?? ({} as Data),
575 timestamp,
576 id: randomUUID()
577 }
578 const res = new Promise<Response>((resolve, reject) => {
579 this.promiseResponseMap.set(submittedTask.id as string, {
580 resolve,
581 reject,
582 worker: this.workerNodes[workerNodeKey].worker
583 })
584 })
585 if (
586 this.opts.enableTasksQueue === true &&
587 (this.busy ||
588 this.workerNodes[workerNodeKey].usage.tasks.executing >=
589 ((this.opts.tasksQueueOptions as TasksQueueOptions)
590 .concurrency as number))
591 ) {
592 this.enqueueTask(workerNodeKey, submittedTask)
593 } else {
594 this.executeTask(workerNodeKey, submittedTask)
595 }
596 this.checkAndEmitEvents()
597 // eslint-disable-next-line @typescript-eslint/return-await
598 return res
599 }
600
601 /** @inheritDoc */
602 public async destroy (): Promise<void> {
603 await Promise.all(
604 this.workerNodes.map(async (workerNode, workerNodeKey) => {
605 this.flushTasksQueue(workerNodeKey)
606 // FIXME: wait for tasks to be finished
607 const workerExitPromise = new Promise<void>(resolve => {
608 workerNode.worker.on('exit', () => {
609 resolve()
610 })
611 })
612 await this.destroyWorker(workerNode.worker)
613 await workerExitPromise
614 })
615 )
616 }
617
618 /**
619 * Terminates the given worker.
620 *
621 * @param worker - A worker within `workerNodes`.
622 */
623 protected abstract destroyWorker (worker: Worker): void | Promise<void>
624
625 /**
626 * Setup hook to execute code before worker nodes are created in the abstract constructor.
627 * Can be overridden.
628 *
629 * @virtual
630 */
631 protected setupHook (): void {
632 // Intentionally empty
633 }
634
635 /**
636 * Should return whether the worker is the main worker or not.
637 */
638 protected abstract isMain (): boolean
639
640 /**
641 * Hook executed before the worker task execution.
642 * Can be overridden.
643 *
644 * @param workerNodeKey - The worker node key.
645 * @param task - The task to execute.
646 */
647 protected beforeTaskExecutionHook (
648 workerNodeKey: number,
649 task: Task<Data>
650 ): void {
651 const workerUsage = this.workerNodes[workerNodeKey].usage
652 ++workerUsage.tasks.executing
653 this.updateWaitTimeWorkerUsage(workerUsage, task)
654 }
655
656 /**
657 * Hook executed after the worker task execution.
658 * Can be overridden.
659 *
660 * @param worker - The worker.
661 * @param message - The received message.
662 */
663 protected afterTaskExecutionHook (
664 worker: Worker,
665 message: MessageValue<Response>
666 ): void {
667 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
668 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
669 this.updateRunTimeWorkerUsage(workerUsage, message)
670 this.updateEluWorkerUsage(workerUsage, message)
671 }
672
673 private updateTaskStatisticsWorkerUsage (
674 workerUsage: WorkerUsage,
675 message: MessageValue<Response>
676 ): void {
677 const workerTaskStatistics = workerUsage.tasks
678 --workerTaskStatistics.executing
679 if (message.taskError == null) {
680 ++workerTaskStatistics.executed
681 } else {
682 ++workerTaskStatistics.failed
683 }
684 }
685
686 private updateRunTimeWorkerUsage (
687 workerUsage: WorkerUsage,
688 message: MessageValue<Response>
689 ): void {
690 if (
691 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
692 .aggregate
693 ) {
694 const taskRunTime = message.taskPerformance?.runTime ?? 0
695 workerUsage.runTime.aggregate =
696 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
697 workerUsage.runTime.minimum = Math.min(
698 taskRunTime,
699 workerUsage.runTime?.minimum ?? Infinity
700 )
701 workerUsage.runTime.maximum = Math.max(
702 taskRunTime,
703 workerUsage.runTime?.maximum ?? -Infinity
704 )
705 if (
706 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
707 .average &&
708 workerUsage.tasks.executed !== 0
709 ) {
710 workerUsage.runTime.average =
711 workerUsage.runTime.aggregate / workerUsage.tasks.executed
712 }
713 if (
714 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
715 .median &&
716 message.taskPerformance?.runTime != null
717 ) {
718 workerUsage.runTime.history.push(message.taskPerformance.runTime)
719 workerUsage.runTime.median = median(workerUsage.runTime.history)
720 }
721 }
722 }
723
724 private updateWaitTimeWorkerUsage (
725 workerUsage: WorkerUsage,
726 task: Task<Data>
727 ): void {
728 const timestamp = performance.now()
729 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
730 if (
731 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
732 .aggregate
733 ) {
734 workerUsage.waitTime.aggregate =
735 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
736 workerUsage.waitTime.minimum = Math.min(
737 taskWaitTime,
738 workerUsage.waitTime?.minimum ?? Infinity
739 )
740 workerUsage.waitTime.maximum = Math.max(
741 taskWaitTime,
742 workerUsage.waitTime?.maximum ?? -Infinity
743 )
744 if (
745 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
746 .waitTime.average &&
747 workerUsage.tasks.executed !== 0
748 ) {
749 workerUsage.waitTime.average =
750 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
751 }
752 if (
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
754 .waitTime.median
755 ) {
756 workerUsage.waitTime.history.push(taskWaitTime)
757 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
758 }
759 }
760 }
761
762 private updateEluWorkerUsage (
763 workerUsage: WorkerUsage,
764 message: MessageValue<Response>
765 ): void {
766 if (
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
768 .aggregate
769 ) {
770 if (message.taskPerformance?.elu != null) {
771 workerUsage.elu.idle.aggregate =
772 (workerUsage.elu.idle?.aggregate ?? 0) +
773 message.taskPerformance.elu.idle
774 workerUsage.elu.active.aggregate =
775 (workerUsage.elu.active?.aggregate ?? 0) +
776 message.taskPerformance.elu.active
777 if (workerUsage.elu.utilization != null) {
778 workerUsage.elu.utilization =
779 (workerUsage.elu.utilization +
780 message.taskPerformance.elu.utilization) /
781 2
782 } else {
783 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
784 }
785 workerUsage.elu.idle.minimum = Math.min(
786 message.taskPerformance.elu.idle,
787 workerUsage.elu.idle?.minimum ?? Infinity
788 )
789 workerUsage.elu.idle.maximum = Math.max(
790 message.taskPerformance.elu.idle,
791 workerUsage.elu.idle?.maximum ?? -Infinity
792 )
793 workerUsage.elu.active.minimum = Math.min(
794 message.taskPerformance.elu.active,
795 workerUsage.elu.active?.minimum ?? Infinity
796 )
797 workerUsage.elu.active.maximum = Math.max(
798 message.taskPerformance.elu.active,
799 workerUsage.elu.active?.maximum ?? -Infinity
800 )
801 if (
802 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
803 .average &&
804 workerUsage.tasks.executed !== 0
805 ) {
806 workerUsage.elu.idle.average =
807 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
808 workerUsage.elu.active.average =
809 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
810 }
811 if (
812 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
813 .median
814 ) {
815 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
816 workerUsage.elu.active.history.push(
817 message.taskPerformance.elu.active
818 )
819 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
820 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
821 }
822 }
823 }
824 }
825
826 /**
827 * Chooses a worker node for the next task.
828 *
829 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
830 *
831 * @returns The worker node key
832 */
833 private chooseWorkerNode (): number {
834 if (this.shallCreateDynamicWorker()) {
835 const worker = this.createAndSetupDynamicWorker()
836 if (
837 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
838 ) {
839 return this.getWorkerNodeKey(worker)
840 }
841 }
842 return this.workerChoiceStrategyContext.execute()
843 }
844
845 /**
846 * Conditions for dynamic worker creation.
847 *
848 * @returns Whether to create a dynamic worker or not.
849 */
850 private shallCreateDynamicWorker (): boolean {
851 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
852 }
853
854 /**
855 * Sends a message to the given worker.
856 *
857 * @param worker - The worker which should receive the message.
858 * @param message - The message.
859 */
860 protected abstract sendToWorker (
861 worker: Worker,
862 message: MessageValue<Data>
863 ): void
864
865 /**
866 * Registers a listener callback on the given worker.
867 *
868 * @param worker - The worker which should register a listener.
869 * @param listener - The message listener callback.
870 */
871 private registerWorkerMessageListener<Message extends Data | Response>(
872 worker: Worker,
873 listener: (message: MessageValue<Message>) => void
874 ): void {
875 worker.on('message', listener as MessageHandler<Worker>)
876 }
877
878 /**
879 * Creates a new worker.
880 *
881 * @returns Newly created worker.
882 */
883 protected abstract createWorker (): Worker
884
885 /**
886 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
887 * Can be overridden.
888 *
889 * @param worker - The newly created worker.
890 */
891 protected afterWorkerSetup (worker: Worker): void {
892 // Listen to worker messages.
893 this.registerWorkerMessageListener(worker, this.workerListener())
894 // Send startup message to worker.
895 this.sendToWorker(worker, {
896 ready: false,
897 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
898 })
899 // Setup worker task statistics computation.
900 this.setWorkerStatistics(worker)
901 }
902
903 /**
904 * Creates a new worker and sets it up completely in the pool worker nodes.
905 *
906 * @returns New, completely set up worker.
907 */
908 protected createAndSetupWorker (): Worker {
909 const worker = this.createWorker()
910
911 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
912 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
913 worker.on('error', error => {
914 if (this.emitter != null) {
915 this.emitter.emit(PoolEvents.error, error)
916 }
917 if (this.opts.enableTasksQueue === true) {
918 this.redistributeQueuedTasks(worker)
919 }
920 if (this.opts.restartWorkerOnError === true && !this.starting) {
921 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
922 this.createAndSetupDynamicWorker()
923 } else {
924 this.createAndSetupWorker()
925 }
926 }
927 })
928 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
929 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
930 worker.once('exit', () => {
931 this.removeWorkerNode(worker)
932 })
933
934 this.pushWorkerNode(worker)
935
936 this.afterWorkerSetup(worker)
937
938 return worker
939 }
940
941 private redistributeQueuedTasks (worker: Worker): void {
942 const workerNodeKey = this.getWorkerNodeKey(worker)
943 while (this.tasksQueueSize(workerNodeKey) > 0) {
944 let targetWorkerNodeKey: number = workerNodeKey
945 let minQueuedTasks = Infinity
946 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
947 if (
948 workerNodeId !== workerNodeKey &&
949 workerNode.usage.tasks.queued === 0
950 ) {
951 targetWorkerNodeKey = workerNodeId
952 break
953 }
954 if (
955 workerNodeId !== workerNodeKey &&
956 workerNode.usage.tasks.queued < minQueuedTasks
957 ) {
958 minQueuedTasks = workerNode.usage.tasks.queued
959 targetWorkerNodeKey = workerNodeId
960 }
961 }
962 this.enqueueTask(
963 targetWorkerNodeKey,
964 this.dequeueTask(workerNodeKey) as Task<Data>
965 )
966 }
967 }
968
969 /**
970 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
971 *
972 * @returns New, completely set up dynamic worker.
973 */
974 protected createAndSetupDynamicWorker (): Worker {
975 const worker = this.createAndSetupWorker()
976 this.registerWorkerMessageListener(worker, message => {
977 const workerNodeKey = this.getWorkerNodeKey(worker)
978 if (
979 isKillBehavior(KillBehaviors.HARD, message.kill) ||
980 (message.kill != null &&
981 ((this.opts.enableTasksQueue === false &&
982 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
983 (this.opts.enableTasksQueue === true &&
984 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
985 this.tasksQueueSize(workerNodeKey) === 0)))
986 ) {
987 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
988 void (this.destroyWorker(worker) as Promise<void>)
989 }
990 })
991 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
992 this.sendToWorker(worker, { checkAlive: true })
993 return worker
994 }
995
996 /**
997 * This function is the listener registered for each worker message.
998 *
999 * @returns The listener function to execute when a message is received from a worker.
1000 */
1001 protected workerListener (): (message: MessageValue<Response>) => void {
1002 return message => {
1003 if (message.ready != null && message.workerId != null) {
1004 // Worker ready message received
1005 this.handleWorkerReadyMessage(message)
1006 } else if (message.id != null) {
1007 // Task execution response received
1008 this.handleTaskExecutionResponse(message)
1009 }
1010 }
1011 }
1012
1013 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1014 const worker = this.getWorkerById(message.workerId as number)
1015 if (worker != null) {
1016 this.getWorkerInfo(this.getWorkerNodeKey(worker)).ready =
1017 message.ready as boolean
1018 } else {
1019 throw new Error(
1020 `Worker ready message received from unknown worker '${
1021 message.workerId as number
1022 }'`
1023 )
1024 }
1025 if (this.emitter != null && this.ready) {
1026 this.emitter.emit(PoolEvents.ready, this.info)
1027 }
1028 }
1029
1030 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1031 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1032 if (promiseResponse != null) {
1033 if (message.taskError != null) {
1034 if (this.emitter != null) {
1035 this.emitter.emit(PoolEvents.taskError, message.taskError)
1036 }
1037 promiseResponse.reject(message.taskError.message)
1038 } else {
1039 promiseResponse.resolve(message.data as Response)
1040 }
1041 this.afterTaskExecutionHook(promiseResponse.worker, message)
1042 this.promiseResponseMap.delete(message.id as string)
1043 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1044 if (
1045 this.opts.enableTasksQueue === true &&
1046 this.tasksQueueSize(workerNodeKey) > 0
1047 ) {
1048 this.executeTask(
1049 workerNodeKey,
1050 this.dequeueTask(workerNodeKey) as Task<Data>
1051 )
1052 }
1053 this.workerChoiceStrategyContext.update(workerNodeKey)
1054 }
1055 }
1056
1057 private checkAndEmitEvents (): void {
1058 if (this.emitter != null) {
1059 if (this.busy) {
1060 this.emitter.emit(PoolEvents.busy, this.info)
1061 }
1062 if (this.type === PoolTypes.dynamic && this.full) {
1063 this.emitter.emit(PoolEvents.full, this.info)
1064 }
1065 }
1066 }
1067
1068 /**
1069 * Gets the worker information.
1070 *
1071 * @param workerNodeKey - The worker node key.
1072 */
1073 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1074 return this.workerNodes[workerNodeKey].info
1075 }
1076
1077 /**
1078 * Pushes the given worker in the pool worker nodes.
1079 *
1080 * @param worker - The worker.
1081 * @returns The worker nodes length.
1082 */
1083 private pushWorkerNode (worker: Worker): number {
1084 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1085 }
1086
1087 /**
1088 * Removes the given worker from the pool worker nodes.
1089 *
1090 * @param worker - The worker.
1091 */
1092 private removeWorkerNode (worker: Worker): void {
1093 const workerNodeKey = this.getWorkerNodeKey(worker)
1094 if (workerNodeKey !== -1) {
1095 this.workerNodes.splice(workerNodeKey, 1)
1096 this.workerChoiceStrategyContext.remove(workerNodeKey)
1097 }
1098 }
1099
1100 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1101 this.beforeTaskExecutionHook(workerNodeKey, task)
1102 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1103 }
1104
1105 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1106 return this.workerNodes[workerNodeKey].enqueueTask(task)
1107 }
1108
1109 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1110 return this.workerNodes[workerNodeKey].dequeueTask()
1111 }
1112
1113 private tasksQueueSize (workerNodeKey: number): number {
1114 return this.workerNodes[workerNodeKey].tasksQueueSize()
1115 }
1116
1117 private flushTasksQueue (workerNodeKey: number): void {
1118 while (this.tasksQueueSize(workerNodeKey) > 0) {
1119 this.executeTask(
1120 workerNodeKey,
1121 this.dequeueTask(workerNodeKey) as Task<Data>
1122 )
1123 }
1124 this.workerNodes[workerNodeKey].clearTasksQueue()
1125 }
1126
1127 private flushTasksQueues (): void {
1128 for (const [workerNodeKey] of this.workerNodes.entries()) {
1129 this.flushTasksQueue(workerNodeKey)
1130 }
1131 }
1132
1133 private setWorkerStatistics (worker: Worker): void {
1134 this.sendToWorker(worker, {
1135 statistics: {
1136 runTime:
1137 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1138 .runTime.aggregate,
1139 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1140 .elu.aggregate
1141 }
1142 })
1143 }
1144 }