b43d44a59eea1cb1c7990e660b96cb4cd5483a03
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round
17 } from '../utils'
18 import { KillBehaviors } from '../worker/worker-options'
19 import {
20 type IPool,
21 PoolEmitter,
22 PoolEvents,
23 type PoolInfo,
24 type PoolOptions,
25 type PoolType,
26 PoolTypes,
27 type TasksQueueOptions
28 } from './pool'
29 import type {
30 IWorker,
31 IWorkerNode,
32 MessageHandler,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 Measurements,
39 WorkerChoiceStrategies,
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
42 } from './selection-strategies/selection-strategies-types'
43 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
44 import { version } from './version'
45 import { WorkerNode } from './worker-node'
46
47 /**
48 * Base class that implements some shared logic for all poolifier pools.
49 *
50 * @typeParam Worker - Type of worker which manages this pool.
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
53 */
54 export abstract class AbstractPool<
55 Worker extends IWorker,
56 Data = unknown,
57 Response = unknown
58 > implements IPool<Worker, Data, Response> {
59 /** @inheritDoc */
60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
61
62 /** @inheritDoc */
63 public readonly emitter?: PoolEmitter
64
65 /**
66 * The execution response promise map.
67 *
68 * - `key`: The message id of each submitted task.
69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
70 *
71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
72 */
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * The start timestamp of the pool.
89 */
90 private readonly startTimestamp
91
92 /**
93 * Constructs a new poolifier pool.
94 *
95 * @param numberOfWorkers - Number of workers that this pool should manage.
96 * @param filePath - Path to the worker file.
97 * @param opts - Options for the pool.
98 */
99 public constructor (
100 protected readonly numberOfWorkers: number,
101 protected readonly filePath: string,
102 protected readonly opts: PoolOptions<Worker>
103 ) {
104 if (!this.isMain()) {
105 throw new Error('Cannot start a pool from a worker!')
106 }
107 this.checkNumberOfWorkers(this.numberOfWorkers)
108 this.checkFilePath(this.filePath)
109 this.checkPoolOptions(this.opts)
110
111 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
112 this.executeTask = this.executeTask.bind(this)
113 this.enqueueTask = this.enqueueTask.bind(this)
114 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
115
116 if (this.opts.enableEvents === true) {
117 this.emitter = new PoolEmitter()
118 }
119 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
120 Worker,
121 Data,
122 Response
123 >(
124 this,
125 this.opts.workerChoiceStrategy,
126 this.opts.workerChoiceStrategyOptions
127 )
128
129 this.setupHook()
130
131 while (
132 this.workerNodes.reduce(
133 (accumulator, workerNode) =>
134 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
135 0
136 ) < this.numberOfWorkers
137 ) {
138 this.createAndSetupWorker()
139 }
140
141 this.startTimestamp = performance.now()
142 }
143
144 private checkFilePath (filePath: string): void {
145 if (
146 filePath == null ||
147 typeof filePath !== 'string' ||
148 (typeof filePath === 'string' && filePath.trim().length === 0)
149 ) {
150 throw new Error('Please specify a file with a worker implementation')
151 }
152 if (!existsSync(filePath)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
154 }
155 }
156
157 private checkNumberOfWorkers (numberOfWorkers: number): void {
158 if (numberOfWorkers == null) {
159 throw new Error(
160 'Cannot instantiate a pool without specifying the number of workers'
161 )
162 } else if (!Number.isSafeInteger(numberOfWorkers)) {
163 throw new TypeError(
164 'Cannot instantiate a pool with a non safe integer number of workers'
165 )
166 } else if (numberOfWorkers < 0) {
167 throw new RangeError(
168 'Cannot instantiate a pool with a negative number of workers'
169 )
170 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
172 }
173 }
174
175 protected checkDynamicPoolSize (min: number, max: number): void {
176 if (this.type === PoolTypes.dynamic) {
177 if (min > max) {
178 throw new RangeError(
179 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
180 )
181 } else if (max === 0) {
182 throw new RangeError(
183 'Cannot instantiate a dynamic pool with a pool size equal to zero'
184 )
185 } else if (min === max) {
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 )
189 }
190 }
191 }
192
193 private checkPoolOptions (opts: PoolOptions<Worker>): void {
194 if (isPlainObject(opts)) {
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
198 this.opts.workerChoiceStrategyOptions =
199 opts.workerChoiceStrategyOptions ??
200 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
201 this.checkValidWorkerChoiceStrategyOptions(
202 this.opts.workerChoiceStrategyOptions
203 )
204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
208 this.checkValidTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
212 opts.tasksQueueOptions as TasksQueueOptions
213 )
214 }
215 } else {
216 throw new TypeError('Invalid pool options: must be a plain object')
217 }
218 }
219
220 private checkValidWorkerChoiceStrategy (
221 workerChoiceStrategy: WorkerChoiceStrategy
222 ): void {
223 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
224 throw new Error(
225 `Invalid worker choice strategy '${workerChoiceStrategy}'`
226 )
227 }
228 }
229
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
232 ): void {
233 if (!isPlainObject(workerChoiceStrategyOptions)) {
234 throw new TypeError(
235 'Invalid worker choice strategy options: must be a plain object'
236 )
237 }
238 if (
239 workerChoiceStrategyOptions.weights != null &&
240 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
241 ) {
242 throw new Error(
243 'Invalid worker choice strategy options: must have a weight for each worker node'
244 )
245 }
246 if (
247 workerChoiceStrategyOptions.measurement != null &&
248 !Object.values(Measurements).includes(
249 workerChoiceStrategyOptions.measurement
250 )
251 ) {
252 throw new Error(
253 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
254 )
255 }
256 }
257
258 private checkValidTasksQueueOptions (
259 tasksQueueOptions: TasksQueueOptions
260 ): void {
261 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
262 throw new TypeError('Invalid tasks queue options: must be a plain object')
263 }
264 if (
265 tasksQueueOptions?.concurrency != null &&
266 !Number.isSafeInteger(tasksQueueOptions.concurrency)
267 ) {
268 throw new TypeError(
269 'Invalid worker tasks concurrency: must be an integer'
270 )
271 }
272 if (
273 tasksQueueOptions?.concurrency != null &&
274 tasksQueueOptions.concurrency <= 0
275 ) {
276 throw new Error(
277 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
278 )
279 }
280 }
281
282 /** @inheritDoc */
283 public get info (): PoolInfo {
284 return {
285 version,
286 type: this.type,
287 worker: this.worker,
288 ready: this.ready,
289 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
290 minSize: this.minSize,
291 maxSize: this.maxSize,
292 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
293 .runTime.aggregate &&
294 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
295 .waitTime.aggregate && { utilization: round(this.utilization) }),
296 workerNodes: this.workerNodes.length,
297 idleWorkerNodes: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 workerNode.usage.tasks.executing === 0
300 ? accumulator + 1
301 : accumulator,
302 0
303 ),
304 busyWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
307 0
308 ),
309 executedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.executed,
312 0
313 ),
314 executingTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.tasks.executing,
317 0
318 ),
319 queuedTasks: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.tasks.queued,
322 0
323 ),
324 maxQueuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
327 0
328 ),
329 failedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + workerNode.usage.tasks.failed,
332 0
333 ),
334 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
335 .runTime.aggregate && {
336 runTime: {
337 minimum: round(
338 Math.min(
339 ...this.workerNodes.map(
340 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
341 )
342 )
343 ),
344 maximum: round(
345 Math.max(
346 ...this.workerNodes.map(
347 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
348 )
349 )
350 ),
351 average: round(
352 this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
355 0
356 ) /
357 this.workerNodes.reduce(
358 (accumulator, workerNode) =>
359 accumulator + (workerNode.usage.tasks?.executed ?? 0),
360 0
361 )
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.median && {
365 median: round(
366 median(
367 this.workerNodes.map(
368 workerNode => workerNode.usage.runTime?.median ?? 0
369 )
370 )
371 )
372 })
373 }
374 }),
375 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
376 .waitTime.aggregate && {
377 waitTime: {
378 minimum: round(
379 Math.min(
380 ...this.workerNodes.map(
381 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
382 )
383 )
384 ),
385 maximum: round(
386 Math.max(
387 ...this.workerNodes.map(
388 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
389 )
390 )
391 ),
392 average: round(
393 this.workerNodes.reduce(
394 (accumulator, workerNode) =>
395 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
396 0
397 ) /
398 this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + (workerNode.usage.tasks?.executed ?? 0),
401 0
402 )
403 ),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.median && {
406 median: round(
407 median(
408 this.workerNodes.map(
409 workerNode => workerNode.usage.waitTime?.median ?? 0
410 )
411 )
412 )
413 })
414 }
415 })
416 }
417 }
418
419 private get starting (): boolean {
420 return (
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
424 0
425 ) < this.minSize
426 )
427 }
428
429 private get ready (): boolean {
430 return (
431 this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 !workerNode.info.dynamic && workerNode.info.ready
434 ? accumulator + 1
435 : accumulator,
436 0
437 ) >= this.minSize
438 )
439 }
440
441 /**
442 * Gets the approximate pool utilization.
443 *
444 * @returns The pool utilization.
445 */
446 private get utilization (): number {
447 const poolTimeCapacity =
448 (performance.now() - this.startTimestamp) * this.maxSize
449 const totalTasksRunTime = this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
452 0
453 )
454 const totalTasksWaitTime = this.workerNodes.reduce(
455 (accumulator, workerNode) =>
456 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
457 0
458 )
459 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
460 }
461
462 /**
463 * Pool type.
464 *
465 * If it is `'dynamic'`, it provides the `max` property.
466 */
467 protected abstract get type (): PoolType
468
469 /**
470 * Gets the worker type.
471 */
472 protected abstract get worker (): WorkerType
473
474 /**
475 * Pool minimum size.
476 */
477 protected abstract get minSize (): number
478
479 /**
480 * Pool maximum size.
481 */
482 protected abstract get maxSize (): number
483
484 /**
485 * Get the worker given its id.
486 *
487 * @param workerId - The worker id.
488 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
489 */
490 private getWorkerById (workerId: number): Worker | undefined {
491 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
492 ?.worker
493 }
494
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
501 private checkMessageWorkerId (message: MessageValue<Response>): void {
502 if (
503 message.workerId != null &&
504 this.getWorkerById(message.workerId) == null
505 ) {
506 throw new Error(
507 `Worker message received from unknown worker '${message.workerId}'`
508 )
509 }
510 }
511
512 /**
513 * Gets the given worker its worker node key.
514 *
515 * @param worker - The worker.
516 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
517 */
518 private getWorkerNodeKey (worker: Worker): number {
519 return this.workerNodes.findIndex(
520 workerNode => workerNode.worker === worker
521 )
522 }
523
524 /** @inheritDoc */
525 public setWorkerChoiceStrategy (
526 workerChoiceStrategy: WorkerChoiceStrategy,
527 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
528 ): void {
529 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
530 this.opts.workerChoiceStrategy = workerChoiceStrategy
531 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
532 this.opts.workerChoiceStrategy
533 )
534 if (workerChoiceStrategyOptions != null) {
535 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
536 }
537 for (const workerNode of this.workerNodes) {
538 workerNode.resetUsage()
539 this.setWorkerStatistics(workerNode.worker)
540 }
541 }
542
543 /** @inheritDoc */
544 public setWorkerChoiceStrategyOptions (
545 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
546 ): void {
547 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
549 this.workerChoiceStrategyContext.setOptions(
550 this.opts.workerChoiceStrategyOptions
551 )
552 }
553
554 /** @inheritDoc */
555 public enableTasksQueue (
556 enable: boolean,
557 tasksQueueOptions?: TasksQueueOptions
558 ): void {
559 if (this.opts.enableTasksQueue === true && !enable) {
560 this.flushTasksQueues()
561 }
562 this.opts.enableTasksQueue = enable
563 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
564 }
565
566 /** @inheritDoc */
567 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
568 if (this.opts.enableTasksQueue === true) {
569 this.checkValidTasksQueueOptions(tasksQueueOptions)
570 this.opts.tasksQueueOptions =
571 this.buildTasksQueueOptions(tasksQueueOptions)
572 } else if (this.opts.tasksQueueOptions != null) {
573 delete this.opts.tasksQueueOptions
574 }
575 }
576
577 private buildTasksQueueOptions (
578 tasksQueueOptions: TasksQueueOptions
579 ): TasksQueueOptions {
580 return {
581 concurrency: tasksQueueOptions?.concurrency ?? 1
582 }
583 }
584
585 /**
586 * Whether the pool is full or not.
587 *
588 * The pool filling boolean status.
589 */
590 protected get full (): boolean {
591 return this.workerNodes.length >= this.maxSize
592 }
593
594 /**
595 * Whether the pool is busy or not.
596 *
597 * The pool busyness boolean status.
598 */
599 protected abstract get busy (): boolean
600
601 /**
602 * Whether worker nodes are executing at least one task.
603 *
604 * @returns Worker nodes busyness boolean status.
605 */
606 protected internalBusy (): boolean {
607 return (
608 this.workerNodes.findIndex(workerNode => {
609 return workerNode.usage.tasks.executing === 0
610 }) === -1
611 )
612 }
613
614 /** @inheritDoc */
615 public async execute (data?: Data, name?: string): Promise<Response> {
616 const timestamp = performance.now()
617 const workerNodeKey = this.chooseWorkerNode()
618 const submittedTask: Task<Data> = {
619 name: name ?? DEFAULT_TASK_NAME,
620 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
621 data: data ?? ({} as Data),
622 timestamp,
623 workerId: this.getWorkerInfo(workerNodeKey).id as number,
624 id: randomUUID()
625 }
626 const res = new Promise<Response>((resolve, reject) => {
627 this.promiseResponseMap.set(submittedTask.id as string, {
628 resolve,
629 reject,
630 worker: this.workerNodes[workerNodeKey].worker
631 })
632 })
633 if (
634 this.opts.enableTasksQueue === true &&
635 (this.busy ||
636 this.workerNodes[workerNodeKey].usage.tasks.executing >=
637 ((this.opts.tasksQueueOptions as TasksQueueOptions)
638 .concurrency as number))
639 ) {
640 this.enqueueTask(workerNodeKey, submittedTask)
641 } else {
642 this.executeTask(workerNodeKey, submittedTask)
643 }
644 this.checkAndEmitEvents()
645 // eslint-disable-next-line @typescript-eslint/return-await
646 return res
647 }
648
649 /** @inheritDoc */
650 public async destroy (): Promise<void> {
651 await Promise.all(
652 this.workerNodes.map(async (workerNode, workerNodeKey) => {
653 this.flushTasksQueue(workerNodeKey)
654 // FIXME: wait for tasks to be finished
655 const workerExitPromise = new Promise<void>(resolve => {
656 workerNode.worker.on('exit', () => {
657 resolve()
658 })
659 })
660 await this.destroyWorker(workerNode.worker)
661 await workerExitPromise
662 })
663 )
664 }
665
666 /**
667 * Terminates the given worker.
668 *
669 * @param worker - A worker within `workerNodes`.
670 */
671 protected abstract destroyWorker (worker: Worker): void | Promise<void>
672
673 /**
674 * Setup hook to execute code before worker nodes are created in the abstract constructor.
675 * Can be overridden.
676 *
677 * @virtual
678 */
679 protected setupHook (): void {
680 // Intentionally empty
681 }
682
683 /**
684 * Should return whether the worker is the main worker or not.
685 */
686 protected abstract isMain (): boolean
687
688 /**
689 * Hook executed before the worker task execution.
690 * Can be overridden.
691 *
692 * @param workerNodeKey - The worker node key.
693 * @param task - The task to execute.
694 */
695 protected beforeTaskExecutionHook (
696 workerNodeKey: number,
697 task: Task<Data>
698 ): void {
699 const workerUsage = this.workerNodes[workerNodeKey].usage
700 ++workerUsage.tasks.executing
701 this.updateWaitTimeWorkerUsage(workerUsage, task)
702 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
703 task.name as string
704 ) as WorkerUsage
705 ++taskWorkerUsage.tasks.executing
706 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
707 }
708
709 /**
710 * Hook executed after the worker task execution.
711 * Can be overridden.
712 *
713 * @param worker - The worker.
714 * @param message - The received message.
715 */
716 protected afterTaskExecutionHook (
717 worker: Worker,
718 message: MessageValue<Response>
719 ): void {
720 const workerNodeKey = this.getWorkerNodeKey(worker)
721 const workerUsage = this.workerNodes[workerNodeKey].usage
722 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
723 this.updateRunTimeWorkerUsage(workerUsage, message)
724 this.updateEluWorkerUsage(workerUsage, message)
725 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
726 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
727 ) as WorkerUsage
728 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
729 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
730 this.updateEluWorkerUsage(taskWorkerUsage, message)
731 }
732
733 private updateTaskStatisticsWorkerUsage (
734 workerUsage: WorkerUsage,
735 message: MessageValue<Response>
736 ): void {
737 const workerTaskStatistics = workerUsage.tasks
738 --workerTaskStatistics.executing
739 if (message.taskError == null) {
740 ++workerTaskStatistics.executed
741 } else {
742 ++workerTaskStatistics.failed
743 }
744 }
745
746 private updateRunTimeWorkerUsage (
747 workerUsage: WorkerUsage,
748 message: MessageValue<Response>
749 ): void {
750 if (
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
752 .aggregate
753 ) {
754 const taskRunTime = message.taskPerformance?.runTime ?? 0
755 workerUsage.runTime.aggregate =
756 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
757 workerUsage.runTime.minimum = Math.min(
758 taskRunTime,
759 workerUsage.runTime?.minimum ?? Infinity
760 )
761 workerUsage.runTime.maximum = Math.max(
762 taskRunTime,
763 workerUsage.runTime?.maximum ?? -Infinity
764 )
765 if (
766 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
767 .average &&
768 workerUsage.tasks.executed !== 0
769 ) {
770 workerUsage.runTime.average =
771 workerUsage.runTime.aggregate / workerUsage.tasks.executed
772 }
773 if (
774 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
775 .median &&
776 message.taskPerformance?.runTime != null
777 ) {
778 workerUsage.runTime.history.push(message.taskPerformance.runTime)
779 workerUsage.runTime.median = median(workerUsage.runTime.history)
780 }
781 }
782 }
783
784 private updateWaitTimeWorkerUsage (
785 workerUsage: WorkerUsage,
786 task: Task<Data>
787 ): void {
788 const timestamp = performance.now()
789 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
790 if (
791 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
792 .aggregate
793 ) {
794 workerUsage.waitTime.aggregate =
795 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
796 workerUsage.waitTime.minimum = Math.min(
797 taskWaitTime,
798 workerUsage.waitTime?.minimum ?? Infinity
799 )
800 workerUsage.waitTime.maximum = Math.max(
801 taskWaitTime,
802 workerUsage.waitTime?.maximum ?? -Infinity
803 )
804 if (
805 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
806 .waitTime.average &&
807 workerUsage.tasks.executed !== 0
808 ) {
809 workerUsage.waitTime.average =
810 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
811 }
812 if (
813 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
814 .waitTime.median
815 ) {
816 workerUsage.waitTime.history.push(taskWaitTime)
817 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
818 }
819 }
820 }
821
822 private updateEluWorkerUsage (
823 workerUsage: WorkerUsage,
824 message: MessageValue<Response>
825 ): void {
826 if (
827 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
828 .aggregate
829 ) {
830 if (message.taskPerformance?.elu != null) {
831 workerUsage.elu.idle.aggregate =
832 (workerUsage.elu.idle?.aggregate ?? 0) +
833 message.taskPerformance.elu.idle
834 workerUsage.elu.active.aggregate =
835 (workerUsage.elu.active?.aggregate ?? 0) +
836 message.taskPerformance.elu.active
837 if (workerUsage.elu.utilization != null) {
838 workerUsage.elu.utilization =
839 (workerUsage.elu.utilization +
840 message.taskPerformance.elu.utilization) /
841 2
842 } else {
843 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
844 }
845 workerUsage.elu.idle.minimum = Math.min(
846 message.taskPerformance.elu.idle,
847 workerUsage.elu.idle?.minimum ?? Infinity
848 )
849 workerUsage.elu.idle.maximum = Math.max(
850 message.taskPerformance.elu.idle,
851 workerUsage.elu.idle?.maximum ?? -Infinity
852 )
853 workerUsage.elu.active.minimum = Math.min(
854 message.taskPerformance.elu.active,
855 workerUsage.elu.active?.minimum ?? Infinity
856 )
857 workerUsage.elu.active.maximum = Math.max(
858 message.taskPerformance.elu.active,
859 workerUsage.elu.active?.maximum ?? -Infinity
860 )
861 if (
862 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
863 .average &&
864 workerUsage.tasks.executed !== 0
865 ) {
866 workerUsage.elu.idle.average =
867 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
868 workerUsage.elu.active.average =
869 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
870 }
871 if (
872 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
873 .median
874 ) {
875 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
876 workerUsage.elu.active.history.push(
877 message.taskPerformance.elu.active
878 )
879 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
880 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
881 }
882 }
883 }
884 }
885
886 /**
887 * Chooses a worker node for the next task.
888 *
889 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
890 *
891 * @returns The worker node key
892 */
893 private chooseWorkerNode (): number {
894 if (this.shallCreateDynamicWorker()) {
895 const worker = this.createAndSetupDynamicWorker()
896 if (
897 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
898 ) {
899 return this.getWorkerNodeKey(worker)
900 }
901 }
902 return this.workerChoiceStrategyContext.execute()
903 }
904
905 /**
906 * Conditions for dynamic worker creation.
907 *
908 * @returns Whether to create a dynamic worker or not.
909 */
910 private shallCreateDynamicWorker (): boolean {
911 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
912 }
913
914 /**
915 * Sends a message to the given worker.
916 *
917 * @param worker - The worker which should receive the message.
918 * @param message - The message.
919 */
920 protected abstract sendToWorker (
921 worker: Worker,
922 message: MessageValue<Data>
923 ): void
924
925 /**
926 * Creates a new worker.
927 *
928 * @returns Newly created worker.
929 */
930 protected abstract createWorker (): Worker
931
932 /**
933 * Creates a new worker and sets it up completely in the pool worker nodes.
934 *
935 * @returns New, completely set up worker.
936 */
937 protected createAndSetupWorker (): Worker {
938 const worker = this.createWorker()
939
940 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
941 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
942 worker.on('error', error => {
943 const workerNodeKey = this.getWorkerNodeKey(worker)
944 const workerInfo = this.getWorkerInfo(workerNodeKey)
945 workerInfo.ready = false
946 if (this.emitter != null) {
947 this.emitter.emit(PoolEvents.error, error)
948 }
949 if (this.opts.restartWorkerOnError === true && !this.starting) {
950 if (workerInfo.dynamic) {
951 this.createAndSetupDynamicWorker()
952 } else {
953 this.createAndSetupWorker()
954 }
955 }
956 if (this.opts.enableTasksQueue === true) {
957 this.redistributeQueuedTasks(workerNodeKey)
958 }
959 })
960 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
961 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
962 worker.once('exit', () => {
963 this.removeWorkerNode(worker)
964 })
965
966 this.addWorkerNode(worker)
967
968 this.afterWorkerSetup(worker)
969
970 return worker
971 }
972
973 /**
974 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
975 *
976 * @returns New, completely set up dynamic worker.
977 */
978 protected createAndSetupDynamicWorker (): Worker {
979 const worker = this.createAndSetupWorker()
980 this.registerWorkerMessageListener(worker, message => {
981 const workerNodeKey = this.getWorkerNodeKey(worker)
982 if (
983 isKillBehavior(KillBehaviors.HARD, message.kill) ||
984 (message.kill != null &&
985 ((this.opts.enableTasksQueue === false &&
986 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
987 (this.opts.enableTasksQueue === true &&
988 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
989 this.tasksQueueSize(workerNodeKey) === 0)))
990 ) {
991 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
992 void (this.destroyWorker(worker) as Promise<void>)
993 }
994 })
995 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
996 workerInfo.dynamic = true
997 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
998 workerInfo.ready = true
999 }
1000 this.sendToWorker(worker, {
1001 checkActive: true,
1002 workerId: workerInfo.id as number
1003 })
1004 return worker
1005 }
1006
1007 /**
1008 * Registers a listener callback on the given worker.
1009 *
1010 * @param worker - The worker which should register a listener.
1011 * @param listener - The message listener callback.
1012 */
1013 private registerWorkerMessageListener<Message extends Data | Response>(
1014 worker: Worker,
1015 listener: (message: MessageValue<Message>) => void
1016 ): void {
1017 worker.on('message', listener as MessageHandler<Worker>)
1018 }
1019
1020 /**
1021 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1022 * Can be overridden.
1023 *
1024 * @param worker - The newly created worker.
1025 */
1026 protected afterWorkerSetup (worker: Worker): void {
1027 // Listen to worker messages.
1028 this.registerWorkerMessageListener(worker, this.workerListener())
1029 // Send startup message to worker.
1030 this.sendWorkerStartupMessage(worker)
1031 // Setup worker task statistics computation.
1032 this.setWorkerStatistics(worker)
1033 }
1034
1035 private sendWorkerStartupMessage (worker: Worker): void {
1036 this.sendToWorker(worker, {
1037 ready: false,
1038 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1039 })
1040 }
1041
1042 private redistributeQueuedTasks (workerNodeKey: number): void {
1043 while (this.tasksQueueSize(workerNodeKey) > 0) {
1044 let targetWorkerNodeKey: number = workerNodeKey
1045 let minQueuedTasks = Infinity
1046 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1047 const workerInfo = this.getWorkerInfo(workerNodeId)
1048 if (
1049 workerNodeId !== workerNodeKey &&
1050 workerInfo.ready &&
1051 workerNode.usage.tasks.queued === 0
1052 ) {
1053 targetWorkerNodeKey = workerNodeId
1054 break
1055 }
1056 if (
1057 workerNodeId !== workerNodeKey &&
1058 workerInfo.ready &&
1059 workerNode.usage.tasks.queued < minQueuedTasks
1060 ) {
1061 minQueuedTasks = workerNode.usage.tasks.queued
1062 targetWorkerNodeKey = workerNodeId
1063 }
1064 }
1065 this.enqueueTask(
1066 targetWorkerNodeKey,
1067 this.dequeueTask(workerNodeKey) as Task<Data>
1068 )
1069 }
1070 }
1071
1072 /**
1073 * This function is the listener registered for each worker message.
1074 *
1075 * @returns The listener function to execute when a message is received from a worker.
1076 */
1077 protected workerListener (): (message: MessageValue<Response>) => void {
1078 return message => {
1079 this.checkMessageWorkerId(message)
1080 if (message.ready != null) {
1081 // Worker ready response received
1082 this.handleWorkerReadyResponse(message)
1083 } else if (message.id != null) {
1084 // Task execution response received
1085 this.handleTaskExecutionResponse(message)
1086 }
1087 }
1088 }
1089
1090 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1091 const worker = this.getWorkerById(message.workerId)
1092 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1093 message.ready as boolean
1094 if (this.emitter != null && this.ready) {
1095 this.emitter.emit(PoolEvents.ready, this.info)
1096 }
1097 }
1098
1099 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1100 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1101 if (promiseResponse != null) {
1102 if (message.taskError != null) {
1103 if (this.emitter != null) {
1104 this.emitter.emit(PoolEvents.taskError, message.taskError)
1105 }
1106 promiseResponse.reject(message.taskError.message)
1107 } else {
1108 promiseResponse.resolve(message.data as Response)
1109 }
1110 this.afterTaskExecutionHook(promiseResponse.worker, message)
1111 this.promiseResponseMap.delete(message.id as string)
1112 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1113 if (
1114 this.opts.enableTasksQueue === true &&
1115 this.tasksQueueSize(workerNodeKey) > 0
1116 ) {
1117 this.executeTask(
1118 workerNodeKey,
1119 this.dequeueTask(workerNodeKey) as Task<Data>
1120 )
1121 }
1122 this.workerChoiceStrategyContext.update(workerNodeKey)
1123 }
1124 }
1125
1126 private checkAndEmitEvents (): void {
1127 if (this.emitter != null) {
1128 if (this.busy) {
1129 this.emitter.emit(PoolEvents.busy, this.info)
1130 }
1131 if (this.type === PoolTypes.dynamic && this.full) {
1132 this.emitter.emit(PoolEvents.full, this.info)
1133 }
1134 }
1135 }
1136
1137 /**
1138 * Gets the worker information.
1139 *
1140 * @param workerNodeKey - The worker node key.
1141 */
1142 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1143 return this.workerNodes[workerNodeKey].info
1144 }
1145
1146 /**
1147 * Adds the given worker in the pool worker nodes.
1148 *
1149 * @param worker - The worker.
1150 * @returns The worker nodes length.
1151 */
1152 private addWorkerNode (worker: Worker): number {
1153 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1154 // Flag the worker node as ready at pool startup.
1155 if (this.starting) {
1156 workerNode.info.ready = true
1157 }
1158 return this.workerNodes.push(workerNode)
1159 }
1160
1161 /**
1162 * Removes the given worker from the pool worker nodes.
1163 *
1164 * @param worker - The worker.
1165 */
1166 private removeWorkerNode (worker: Worker): void {
1167 const workerNodeKey = this.getWorkerNodeKey(worker)
1168 if (workerNodeKey !== -1) {
1169 this.workerNodes.splice(workerNodeKey, 1)
1170 this.workerChoiceStrategyContext.remove(workerNodeKey)
1171 }
1172 }
1173
1174 /**
1175 * Executes the given task on the given worker.
1176 *
1177 * @param worker - The worker.
1178 * @param task - The task to execute.
1179 */
1180 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1181 this.beforeTaskExecutionHook(workerNodeKey, task)
1182 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1183 }
1184
1185 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1186 return this.workerNodes[workerNodeKey].enqueueTask(task)
1187 }
1188
1189 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1190 return this.workerNodes[workerNodeKey].dequeueTask()
1191 }
1192
1193 private tasksQueueSize (workerNodeKey: number): number {
1194 return this.workerNodes[workerNodeKey].tasksQueueSize()
1195 }
1196
1197 private flushTasksQueue (workerNodeKey: number): void {
1198 while (this.tasksQueueSize(workerNodeKey) > 0) {
1199 this.executeTask(
1200 workerNodeKey,
1201 this.dequeueTask(workerNodeKey) as Task<Data>
1202 )
1203 }
1204 this.workerNodes[workerNodeKey].clearTasksQueue()
1205 }
1206
1207 private flushTasksQueues (): void {
1208 for (const [workerNodeKey] of this.workerNodes.entries()) {
1209 this.flushTasksQueue(workerNodeKey)
1210 }
1211 }
1212
1213 private setWorkerStatistics (worker: Worker): void {
1214 this.sendToWorker(worker, {
1215 statistics: {
1216 runTime:
1217 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1218 .runTime.aggregate,
1219 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1220 .elu.aggregate
1221 },
1222 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1223 })
1224 }
1225 }