refactor: cleanup pool startup code
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round
17 } from '../utils'
18 import { KillBehaviors } from '../worker/worker-options'
19 import {
20 type IPool,
21 PoolEmitter,
22 PoolEvents,
23 type PoolInfo,
24 type PoolOptions,
25 type PoolType,
26 PoolTypes,
27 type TasksQueueOptions
28 } from './pool'
29 import type {
30 IWorker,
31 IWorkerNode,
32 MessageHandler,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 Measurements,
39 WorkerChoiceStrategies,
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
42 } from './selection-strategies/selection-strategies-types'
43 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
44 import { version } from './version'
45 import { WorkerNode } from './worker-node'
46
47 /**
48 * Base class that implements some shared logic for all poolifier pools.
49 *
50 * @typeParam Worker - Type of worker which manages this pool.
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
53 */
54 export abstract class AbstractPool<
55 Worker extends IWorker,
56 Data = unknown,
57 Response = unknown
58 > implements IPool<Worker, Data, Response> {
59 /** @inheritDoc */
60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
61
62 /** @inheritDoc */
63 public readonly emitter?: PoolEmitter
64
65 /**
66 * The execution response promise map.
67 *
68 * - `key`: The message id of each submitted task.
69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
70 *
71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
72 */
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * Whether the pool is starting or not.
89 */
90 private readonly starting: boolean
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
96 /**
97 * Constructs a new poolifier pool.
98 *
99 * @param numberOfWorkers - Number of workers that this pool should manage.
100 * @param filePath - Path to the worker file.
101 * @param opts - Options for the pool.
102 */
103 public constructor (
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
107 ) {
108 if (!this.isMain()) {
109 throw new Error('Cannot start a pool from a worker!')
110 }
111 this.checkNumberOfWorkers(this.numberOfWorkers)
112 this.checkFilePath(this.filePath)
113 this.checkPoolOptions(this.opts)
114
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
119
120 if (this.opts.enableEvents === true) {
121 this.emitter = new PoolEmitter()
122 }
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
132
133 this.setupHook()
134
135 this.starting = true
136 this.startPool()
137 this.starting = false
138
139 this.startTimestamp = performance.now()
140 }
141
142 private checkFilePath (filePath: string): void {
143 if (
144 filePath == null ||
145 typeof filePath !== 'string' ||
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
148 throw new Error('Please specify a file with a worker implementation')
149 }
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
153 }
154
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
161 throw new TypeError(
162 'Cannot instantiate a pool with a non safe integer number of workers'
163 )
164 } else if (numberOfWorkers < 0) {
165 throw new RangeError(
166 'Cannot instantiate a pool with a negative number of workers'
167 )
168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
174 if (this.type === PoolTypes.dynamic) {
175 if (min > max) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
178 )
179 } else if (max === 0) {
180 throw new RangeError(
181 'Cannot instantiate a dynamic pool with a pool size equal to zero'
182 )
183 } else if (min === max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
186 )
187 }
188 }
189 }
190
191 private checkPoolOptions (opts: PoolOptions<Worker>): void {
192 if (isPlainObject(opts)) {
193 this.opts.workerChoiceStrategy =
194 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
195 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
196 this.opts.workerChoiceStrategyOptions =
197 opts.workerChoiceStrategyOptions ??
198 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
199 this.checkValidWorkerChoiceStrategyOptions(
200 this.opts.workerChoiceStrategyOptions
201 )
202 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
203 this.opts.enableEvents = opts.enableEvents ?? true
204 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
205 if (this.opts.enableTasksQueue) {
206 this.checkValidTasksQueueOptions(
207 opts.tasksQueueOptions as TasksQueueOptions
208 )
209 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
210 opts.tasksQueueOptions as TasksQueueOptions
211 )
212 }
213 } else {
214 throw new TypeError('Invalid pool options: must be a plain object')
215 }
216 }
217
218 private checkValidWorkerChoiceStrategy (
219 workerChoiceStrategy: WorkerChoiceStrategy
220 ): void {
221 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
222 throw new Error(
223 `Invalid worker choice strategy '${workerChoiceStrategy}'`
224 )
225 }
226 }
227
228 private checkValidWorkerChoiceStrategyOptions (
229 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
230 ): void {
231 if (!isPlainObject(workerChoiceStrategyOptions)) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: must be a plain object'
234 )
235 }
236 if (
237 workerChoiceStrategyOptions.weights != null &&
238 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
239 ) {
240 throw new Error(
241 'Invalid worker choice strategy options: must have a weight for each worker node'
242 )
243 }
244 if (
245 workerChoiceStrategyOptions.measurement != null &&
246 !Object.values(Measurements).includes(
247 workerChoiceStrategyOptions.measurement
248 )
249 ) {
250 throw new Error(
251 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
252 )
253 }
254 }
255
256 private checkValidTasksQueueOptions (
257 tasksQueueOptions: TasksQueueOptions
258 ): void {
259 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
260 throw new TypeError('Invalid tasks queue options: must be a plain object')
261 }
262 if (
263 tasksQueueOptions?.concurrency != null &&
264 !Number.isSafeInteger(tasksQueueOptions.concurrency)
265 ) {
266 throw new TypeError(
267 'Invalid worker tasks concurrency: must be an integer'
268 )
269 }
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 tasksQueueOptions.concurrency <= 0
273 ) {
274 throw new Error(
275 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
276 )
277 }
278 }
279
280 private startPool (): void {
281 while (
282 this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
285 0
286 ) < this.numberOfWorkers
287 ) {
288 this.createAndSetupWorker()
289 }
290 }
291
292 /** @inheritDoc */
293 public get info (): PoolInfo {
294 return {
295 version,
296 type: this.type,
297 worker: this.worker,
298 ready: this.ready,
299 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
300 minSize: this.minSize,
301 maxSize: this.maxSize,
302 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
303 .runTime.aggregate &&
304 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
305 .waitTime.aggregate && { utilization: round(this.utilization) }),
306 workerNodes: this.workerNodes.length,
307 idleWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
309 workerNode.usage.tasks.executing === 0
310 ? accumulator + 1
311 : accumulator,
312 0
313 ),
314 busyWorkerNodes: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
317 0
318 ),
319 executedTasks: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.tasks.executed,
322 0
323 ),
324 executingTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.tasks.executing,
327 0
328 ),
329 queuedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + workerNode.usage.tasks.queued,
332 0
333 ),
334 maxQueuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
337 0
338 ),
339 failedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.failed,
342 0
343 ),
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate && {
346 runTime: {
347 minimum: round(
348 Math.min(
349 ...this.workerNodes.map(
350 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
351 )
352 )
353 ),
354 maximum: round(
355 Math.max(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
358 )
359 )
360 ),
361 average: round(
362 this.workerNodes.reduce(
363 (accumulator, workerNode) =>
364 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
365 0
366 ) /
367 this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 accumulator + (workerNode.usage.tasks?.executed ?? 0),
370 0
371 )
372 ),
373 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
374 .runTime.median && {
375 median: round(
376 median(
377 this.workerNodes.map(
378 workerNode => workerNode.usage.runTime?.median ?? 0
379 )
380 )
381 )
382 })
383 }
384 }),
385 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386 .waitTime.aggregate && {
387 waitTime: {
388 minimum: round(
389 Math.min(
390 ...this.workerNodes.map(
391 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
392 )
393 )
394 ),
395 maximum: round(
396 Math.max(
397 ...this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
399 )
400 )
401 ),
402 average: round(
403 this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
406 0
407 ) /
408 this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + (workerNode.usage.tasks?.executed ?? 0),
411 0
412 )
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .waitTime.median && {
416 median: round(
417 median(
418 this.workerNodes.map(
419 workerNode => workerNode.usage.waitTime?.median ?? 0
420 )
421 )
422 )
423 })
424 }
425 })
426 }
427 }
428
429 private get ready (): boolean {
430 return (
431 this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 !workerNode.info.dynamic && workerNode.info.ready
434 ? accumulator + 1
435 : accumulator,
436 0
437 ) >= this.minSize
438 )
439 }
440
441 /**
442 * Gets the approximate pool utilization.
443 *
444 * @returns The pool utilization.
445 */
446 private get utilization (): number {
447 const poolTimeCapacity =
448 (performance.now() - this.startTimestamp) * this.maxSize
449 const totalTasksRunTime = this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
452 0
453 )
454 const totalTasksWaitTime = this.workerNodes.reduce(
455 (accumulator, workerNode) =>
456 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
457 0
458 )
459 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
460 }
461
462 /**
463 * Pool type.
464 *
465 * If it is `'dynamic'`, it provides the `max` property.
466 */
467 protected abstract get type (): PoolType
468
469 /**
470 * Gets the worker type.
471 */
472 protected abstract get worker (): WorkerType
473
474 /**
475 * Pool minimum size.
476 */
477 protected abstract get minSize (): number
478
479 /**
480 * Pool maximum size.
481 */
482 protected abstract get maxSize (): number
483
484 /**
485 * Get the worker given its id.
486 *
487 * @param workerId - The worker id.
488 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
489 */
490 private getWorkerById (workerId: number): Worker | undefined {
491 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
492 ?.worker
493 }
494
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
501 private checkMessageWorkerId (message: MessageValue<Response>): void {
502 if (
503 message.workerId != null &&
504 this.getWorkerById(message.workerId) == null
505 ) {
506 throw new Error(
507 `Worker message received from unknown worker '${message.workerId}'`
508 )
509 }
510 }
511
512 /**
513 * Gets the given worker its worker node key.
514 *
515 * @param worker - The worker.
516 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
517 */
518 private getWorkerNodeKey (worker: Worker): number {
519 return this.workerNodes.findIndex(
520 workerNode => workerNode.worker === worker
521 )
522 }
523
524 /** @inheritDoc */
525 public setWorkerChoiceStrategy (
526 workerChoiceStrategy: WorkerChoiceStrategy,
527 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
528 ): void {
529 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
530 this.opts.workerChoiceStrategy = workerChoiceStrategy
531 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
532 this.opts.workerChoiceStrategy
533 )
534 if (workerChoiceStrategyOptions != null) {
535 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
536 }
537 for (const workerNode of this.workerNodes) {
538 workerNode.resetUsage()
539 this.setWorkerStatistics(workerNode.worker)
540 }
541 }
542
543 /** @inheritDoc */
544 public setWorkerChoiceStrategyOptions (
545 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
546 ): void {
547 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
549 this.workerChoiceStrategyContext.setOptions(
550 this.opts.workerChoiceStrategyOptions
551 )
552 }
553
554 /** @inheritDoc */
555 public enableTasksQueue (
556 enable: boolean,
557 tasksQueueOptions?: TasksQueueOptions
558 ): void {
559 if (this.opts.enableTasksQueue === true && !enable) {
560 this.flushTasksQueues()
561 }
562 this.opts.enableTasksQueue = enable
563 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
564 }
565
566 /** @inheritDoc */
567 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
568 if (this.opts.enableTasksQueue === true) {
569 this.checkValidTasksQueueOptions(tasksQueueOptions)
570 this.opts.tasksQueueOptions =
571 this.buildTasksQueueOptions(tasksQueueOptions)
572 } else if (this.opts.tasksQueueOptions != null) {
573 delete this.opts.tasksQueueOptions
574 }
575 }
576
577 private buildTasksQueueOptions (
578 tasksQueueOptions: TasksQueueOptions
579 ): TasksQueueOptions {
580 return {
581 concurrency: tasksQueueOptions?.concurrency ?? 1
582 }
583 }
584
585 /**
586 * Whether the pool is full or not.
587 *
588 * The pool filling boolean status.
589 */
590 protected get full (): boolean {
591 return this.workerNodes.length >= this.maxSize
592 }
593
594 /**
595 * Whether the pool is busy or not.
596 *
597 * The pool busyness boolean status.
598 */
599 protected abstract get busy (): boolean
600
601 /**
602 * Whether worker nodes are executing at least one task.
603 *
604 * @returns Worker nodes busyness boolean status.
605 */
606 protected internalBusy (): boolean {
607 return (
608 this.workerNodes.findIndex(workerNode => {
609 return workerNode.usage.tasks.executing === 0
610 }) === -1
611 )
612 }
613
614 /** @inheritDoc */
615 public async execute (data?: Data, name?: string): Promise<Response> {
616 const timestamp = performance.now()
617 const workerNodeKey = this.chooseWorkerNode()
618 const submittedTask: Task<Data> = {
619 name: name ?? DEFAULT_TASK_NAME,
620 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
621 data: data ?? ({} as Data),
622 timestamp,
623 workerId: this.getWorkerInfo(workerNodeKey).id as number,
624 id: randomUUID()
625 }
626 const res = new Promise<Response>((resolve, reject) => {
627 this.promiseResponseMap.set(submittedTask.id as string, {
628 resolve,
629 reject,
630 worker: this.workerNodes[workerNodeKey].worker
631 })
632 })
633 if (
634 this.opts.enableTasksQueue === true &&
635 (this.busy ||
636 this.workerNodes[workerNodeKey].usage.tasks.executing >=
637 ((this.opts.tasksQueueOptions as TasksQueueOptions)
638 .concurrency as number))
639 ) {
640 this.enqueueTask(workerNodeKey, submittedTask)
641 } else {
642 this.executeTask(workerNodeKey, submittedTask)
643 }
644 this.checkAndEmitEvents()
645 // eslint-disable-next-line @typescript-eslint/return-await
646 return res
647 }
648
649 /** @inheritDoc */
650 public async destroy (): Promise<void> {
651 await Promise.all(
652 this.workerNodes.map(async (workerNode, workerNodeKey) => {
653 this.flushTasksQueue(workerNodeKey)
654 // FIXME: wait for tasks to be finished
655 const workerExitPromise = new Promise<void>(resolve => {
656 workerNode.worker.on('exit', () => {
657 resolve()
658 })
659 })
660 await this.destroyWorker(workerNode.worker)
661 await workerExitPromise
662 })
663 )
664 }
665
666 /**
667 * Terminates the given worker.
668 *
669 * @param worker - A worker within `workerNodes`.
670 */
671 protected abstract destroyWorker (worker: Worker): void | Promise<void>
672
673 /**
674 * Setup hook to execute code before worker nodes are created in the abstract constructor.
675 * Can be overridden.
676 *
677 * @virtual
678 */
679 protected setupHook (): void {
680 // Intentionally empty
681 }
682
683 /**
684 * Should return whether the worker is the main worker or not.
685 */
686 protected abstract isMain (): boolean
687
688 /**
689 * Hook executed before the worker task execution.
690 * Can be overridden.
691 *
692 * @param workerNodeKey - The worker node key.
693 * @param task - The task to execute.
694 */
695 protected beforeTaskExecutionHook (
696 workerNodeKey: number,
697 task: Task<Data>
698 ): void {
699 const workerUsage = this.workerNodes[workerNodeKey].usage
700 ++workerUsage.tasks.executing
701 this.updateWaitTimeWorkerUsage(workerUsage, task)
702 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
703 task.name as string
704 ) as WorkerUsage
705 ++taskWorkerUsage.tasks.executing
706 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
707 }
708
709 /**
710 * Hook executed after the worker task execution.
711 * Can be overridden.
712 *
713 * @param worker - The worker.
714 * @param message - The received message.
715 */
716 protected afterTaskExecutionHook (
717 worker: Worker,
718 message: MessageValue<Response>
719 ): void {
720 const workerNodeKey = this.getWorkerNodeKey(worker)
721 const workerUsage = this.workerNodes[workerNodeKey].usage
722 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
723 this.updateRunTimeWorkerUsage(workerUsage, message)
724 this.updateEluWorkerUsage(workerUsage, message)
725 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
726 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
727 ) as WorkerUsage
728 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
729 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
730 this.updateEluWorkerUsage(taskWorkerUsage, message)
731 }
732
733 private updateTaskStatisticsWorkerUsage (
734 workerUsage: WorkerUsage,
735 message: MessageValue<Response>
736 ): void {
737 const workerTaskStatistics = workerUsage.tasks
738 --workerTaskStatistics.executing
739 if (message.taskError == null) {
740 ++workerTaskStatistics.executed
741 } else {
742 ++workerTaskStatistics.failed
743 }
744 }
745
746 private updateRunTimeWorkerUsage (
747 workerUsage: WorkerUsage,
748 message: MessageValue<Response>
749 ): void {
750 if (
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
752 .aggregate
753 ) {
754 const taskRunTime = message.taskPerformance?.runTime ?? 0
755 workerUsage.runTime.aggregate =
756 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
757 workerUsage.runTime.minimum = Math.min(
758 taskRunTime,
759 workerUsage.runTime?.minimum ?? Infinity
760 )
761 workerUsage.runTime.maximum = Math.max(
762 taskRunTime,
763 workerUsage.runTime?.maximum ?? -Infinity
764 )
765 if (
766 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
767 .average &&
768 workerUsage.tasks.executed !== 0
769 ) {
770 workerUsage.runTime.average =
771 workerUsage.runTime.aggregate / workerUsage.tasks.executed
772 }
773 if (
774 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
775 .median &&
776 message.taskPerformance?.runTime != null
777 ) {
778 workerUsage.runTime.history.push(message.taskPerformance.runTime)
779 workerUsage.runTime.median = median(workerUsage.runTime.history)
780 }
781 }
782 }
783
784 private updateWaitTimeWorkerUsage (
785 workerUsage: WorkerUsage,
786 task: Task<Data>
787 ): void {
788 const timestamp = performance.now()
789 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
790 if (
791 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
792 .aggregate
793 ) {
794 workerUsage.waitTime.aggregate =
795 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
796 workerUsage.waitTime.minimum = Math.min(
797 taskWaitTime,
798 workerUsage.waitTime?.minimum ?? Infinity
799 )
800 workerUsage.waitTime.maximum = Math.max(
801 taskWaitTime,
802 workerUsage.waitTime?.maximum ?? -Infinity
803 )
804 if (
805 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
806 .waitTime.average &&
807 workerUsage.tasks.executed !== 0
808 ) {
809 workerUsage.waitTime.average =
810 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
811 }
812 if (
813 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
814 .waitTime.median
815 ) {
816 workerUsage.waitTime.history.push(taskWaitTime)
817 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
818 }
819 }
820 }
821
822 private updateEluWorkerUsage (
823 workerUsage: WorkerUsage,
824 message: MessageValue<Response>
825 ): void {
826 if (
827 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
828 .aggregate
829 ) {
830 if (message.taskPerformance?.elu != null) {
831 workerUsage.elu.idle.aggregate =
832 (workerUsage.elu.idle?.aggregate ?? 0) +
833 message.taskPerformance.elu.idle
834 workerUsage.elu.active.aggregate =
835 (workerUsage.elu.active?.aggregate ?? 0) +
836 message.taskPerformance.elu.active
837 if (workerUsage.elu.utilization != null) {
838 workerUsage.elu.utilization =
839 (workerUsage.elu.utilization +
840 message.taskPerformance.elu.utilization) /
841 2
842 } else {
843 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
844 }
845 workerUsage.elu.idle.minimum = Math.min(
846 message.taskPerformance.elu.idle,
847 workerUsage.elu.idle?.minimum ?? Infinity
848 )
849 workerUsage.elu.idle.maximum = Math.max(
850 message.taskPerformance.elu.idle,
851 workerUsage.elu.idle?.maximum ?? -Infinity
852 )
853 workerUsage.elu.active.minimum = Math.min(
854 message.taskPerformance.elu.active,
855 workerUsage.elu.active?.minimum ?? Infinity
856 )
857 workerUsage.elu.active.maximum = Math.max(
858 message.taskPerformance.elu.active,
859 workerUsage.elu.active?.maximum ?? -Infinity
860 )
861 if (
862 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
863 .average &&
864 workerUsage.tasks.executed !== 0
865 ) {
866 workerUsage.elu.idle.average =
867 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
868 workerUsage.elu.active.average =
869 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
870 }
871 if (
872 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
873 .median
874 ) {
875 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
876 workerUsage.elu.active.history.push(
877 message.taskPerformance.elu.active
878 )
879 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
880 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
881 }
882 }
883 }
884 }
885
886 /**
887 * Chooses a worker node for the next task.
888 *
889 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
890 *
891 * @returns The worker node key
892 */
893 private chooseWorkerNode (): number {
894 if (this.shallCreateDynamicWorker()) {
895 const worker = this.createAndSetupDynamicWorker()
896 if (
897 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
898 ) {
899 return this.getWorkerNodeKey(worker)
900 }
901 }
902 return this.workerChoiceStrategyContext.execute()
903 }
904
905 /**
906 * Conditions for dynamic worker creation.
907 *
908 * @returns Whether to create a dynamic worker or not.
909 */
910 private shallCreateDynamicWorker (): boolean {
911 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
912 }
913
914 /**
915 * Sends a message to the given worker.
916 *
917 * @param worker - The worker which should receive the message.
918 * @param message - The message.
919 */
920 protected abstract sendToWorker (
921 worker: Worker,
922 message: MessageValue<Data>
923 ): void
924
925 /**
926 * Creates a new worker.
927 *
928 * @returns Newly created worker.
929 */
930 protected abstract createWorker (): Worker
931
932 /**
933 * Creates a new worker and sets it up completely in the pool worker nodes.
934 *
935 * @returns New, completely set up worker.
936 */
937 protected createAndSetupWorker (): Worker {
938 const worker = this.createWorker()
939
940 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
941 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
942 worker.on('error', error => {
943 const workerNodeKey = this.getWorkerNodeKey(worker)
944 const workerInfo = this.getWorkerInfo(workerNodeKey)
945 workerInfo.ready = false
946 this.emitter?.emit(PoolEvents.error, error)
947 if (this.opts.restartWorkerOnError === true && !this.starting) {
948 if (workerInfo.dynamic) {
949 this.createAndSetupDynamicWorker()
950 } else {
951 this.createAndSetupWorker()
952 }
953 }
954 if (this.opts.enableTasksQueue === true) {
955 this.redistributeQueuedTasks(workerNodeKey)
956 }
957 })
958 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
959 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
960 worker.once('exit', () => {
961 this.removeWorkerNode(worker)
962 })
963
964 this.addWorkerNode(worker)
965
966 this.afterWorkerSetup(worker)
967
968 return worker
969 }
970
971 /**
972 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
973 *
974 * @returns New, completely set up dynamic worker.
975 */
976 protected createAndSetupDynamicWorker (): Worker {
977 const worker = this.createAndSetupWorker()
978 this.registerWorkerMessageListener(worker, message => {
979 const workerNodeKey = this.getWorkerNodeKey(worker)
980 if (
981 isKillBehavior(KillBehaviors.HARD, message.kill) ||
982 (message.kill != null &&
983 ((this.opts.enableTasksQueue === false &&
984 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
985 (this.opts.enableTasksQueue === true &&
986 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
987 this.tasksQueueSize(workerNodeKey) === 0)))
988 ) {
989 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
990 void (this.destroyWorker(worker) as Promise<void>)
991 }
992 })
993 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
994 workerInfo.dynamic = true
995 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
996 workerInfo.ready = true
997 }
998 this.sendToWorker(worker, {
999 checkActive: true,
1000 workerId: workerInfo.id as number
1001 })
1002 return worker
1003 }
1004
1005 /**
1006 * Registers a listener callback on the given worker.
1007 *
1008 * @param worker - The worker which should register a listener.
1009 * @param listener - The message listener callback.
1010 */
1011 private registerWorkerMessageListener<Message extends Data | Response>(
1012 worker: Worker,
1013 listener: (message: MessageValue<Message>) => void
1014 ): void {
1015 worker.on('message', listener as MessageHandler<Worker>)
1016 }
1017
1018 /**
1019 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1020 * Can be overridden.
1021 *
1022 * @param worker - The newly created worker.
1023 */
1024 protected afterWorkerSetup (worker: Worker): void {
1025 // Listen to worker messages.
1026 this.registerWorkerMessageListener(worker, this.workerListener())
1027 // Send startup message to worker.
1028 this.sendWorkerStartupMessage(worker)
1029 // Setup worker task statistics computation.
1030 this.setWorkerStatistics(worker)
1031 }
1032
1033 private sendWorkerStartupMessage (worker: Worker): void {
1034 this.sendToWorker(worker, {
1035 ready: false,
1036 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1037 })
1038 }
1039
1040 private redistributeQueuedTasks (workerNodeKey: number): void {
1041 while (this.tasksQueueSize(workerNodeKey) > 0) {
1042 let targetWorkerNodeKey: number = workerNodeKey
1043 let minQueuedTasks = Infinity
1044 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1045 const workerInfo = this.getWorkerInfo(workerNodeId)
1046 if (
1047 workerNodeId !== workerNodeKey &&
1048 workerInfo.ready &&
1049 workerNode.usage.tasks.queued === 0
1050 ) {
1051 targetWorkerNodeKey = workerNodeId
1052 break
1053 }
1054 if (
1055 workerNodeId !== workerNodeKey &&
1056 workerInfo.ready &&
1057 workerNode.usage.tasks.queued < minQueuedTasks
1058 ) {
1059 minQueuedTasks = workerNode.usage.tasks.queued
1060 targetWorkerNodeKey = workerNodeId
1061 }
1062 }
1063 this.enqueueTask(
1064 targetWorkerNodeKey,
1065 this.dequeueTask(workerNodeKey) as Task<Data>
1066 )
1067 }
1068 }
1069
1070 /**
1071 * This function is the listener registered for each worker message.
1072 *
1073 * @returns The listener function to execute when a message is received from a worker.
1074 */
1075 protected workerListener (): (message: MessageValue<Response>) => void {
1076 return message => {
1077 this.checkMessageWorkerId(message)
1078 if (message.ready != null) {
1079 // Worker ready response received
1080 this.handleWorkerReadyResponse(message)
1081 } else if (message.id != null) {
1082 // Task execution response received
1083 this.handleTaskExecutionResponse(message)
1084 }
1085 }
1086 }
1087
1088 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1089 const worker = this.getWorkerById(message.workerId)
1090 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1091 message.ready as boolean
1092 if (this.emitter != null && this.ready) {
1093 this.emitter.emit(PoolEvents.ready, this.info)
1094 }
1095 }
1096
1097 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1098 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1099 if (promiseResponse != null) {
1100 if (message.taskError != null) {
1101 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1102 promiseResponse.reject(message.taskError.message)
1103 } else {
1104 promiseResponse.resolve(message.data as Response)
1105 }
1106 this.afterTaskExecutionHook(promiseResponse.worker, message)
1107 this.promiseResponseMap.delete(message.id as string)
1108 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1109 if (
1110 this.opts.enableTasksQueue === true &&
1111 this.tasksQueueSize(workerNodeKey) > 0
1112 ) {
1113 this.executeTask(
1114 workerNodeKey,
1115 this.dequeueTask(workerNodeKey) as Task<Data>
1116 )
1117 }
1118 this.workerChoiceStrategyContext.update(workerNodeKey)
1119 }
1120 }
1121
1122 private checkAndEmitEvents (): void {
1123 if (this.emitter != null) {
1124 if (this.busy) {
1125 this.emitter.emit(PoolEvents.busy, this.info)
1126 }
1127 if (this.type === PoolTypes.dynamic && this.full) {
1128 this.emitter.emit(PoolEvents.full, this.info)
1129 }
1130 }
1131 }
1132
1133 /**
1134 * Gets the worker information.
1135 *
1136 * @param workerNodeKey - The worker node key.
1137 */
1138 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1139 return this.workerNodes[workerNodeKey].info
1140 }
1141
1142 /**
1143 * Adds the given worker in the pool worker nodes.
1144 *
1145 * @param worker - The worker.
1146 * @returns The worker nodes length.
1147 */
1148 private addWorkerNode (worker: Worker): number {
1149 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1150 // Flag the worker node as ready at pool startup.
1151 if (this.starting) {
1152 workerNode.info.ready = true
1153 }
1154 return this.workerNodes.push(workerNode)
1155 }
1156
1157 /**
1158 * Removes the given worker from the pool worker nodes.
1159 *
1160 * @param worker - The worker.
1161 */
1162 private removeWorkerNode (worker: Worker): void {
1163 const workerNodeKey = this.getWorkerNodeKey(worker)
1164 if (workerNodeKey !== -1) {
1165 this.workerNodes.splice(workerNodeKey, 1)
1166 this.workerChoiceStrategyContext.remove(workerNodeKey)
1167 }
1168 }
1169
1170 /**
1171 * Executes the given task on the given worker.
1172 *
1173 * @param worker - The worker.
1174 * @param task - The task to execute.
1175 */
1176 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1177 this.beforeTaskExecutionHook(workerNodeKey, task)
1178 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1179 }
1180
1181 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1182 return this.workerNodes[workerNodeKey].enqueueTask(task)
1183 }
1184
1185 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1186 return this.workerNodes[workerNodeKey].dequeueTask()
1187 }
1188
1189 private tasksQueueSize (workerNodeKey: number): number {
1190 return this.workerNodes[workerNodeKey].tasksQueueSize()
1191 }
1192
1193 private flushTasksQueue (workerNodeKey: number): void {
1194 while (this.tasksQueueSize(workerNodeKey) > 0) {
1195 this.executeTask(
1196 workerNodeKey,
1197 this.dequeueTask(workerNodeKey) as Task<Data>
1198 )
1199 }
1200 this.workerNodes[workerNodeKey].clearTasksQueue()
1201 }
1202
1203 private flushTasksQueues (): void {
1204 for (const [workerNodeKey] of this.workerNodes.entries()) {
1205 this.flushTasksQueue(workerNodeKey)
1206 }
1207 }
1208
1209 private setWorkerStatistics (worker: Worker): void {
1210 this.sendToWorker(worker, {
1211 statistics: {
1212 runTime:
1213 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1214 .runTime.aggregate,
1215 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1216 .elu.aggregate
1217 },
1218 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1219 })
1220 }
1221 }