10a2d514246748a7d21187f22b6b8400b1697864
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round,
17 updateMeasurementStatistics
18 } from '../utils'
19 import { KillBehaviors } from '../worker/worker-options'
20 import {
21 type IPool,
22 PoolEmitter,
23 PoolEvents,
24 type PoolInfo,
25 type PoolOptions,
26 type PoolType,
27 PoolTypes,
28 type TasksQueueOptions
29 } from './pool'
30 import type {
31 IWorker,
32 IWorkerNode,
33 MessageHandler,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
37 } from './worker'
38 import {
39 type MeasurementStatisticsRequirements,
40 Measurements,
41 WorkerChoiceStrategies,
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
44 } from './selection-strategies/selection-strategies-types'
45 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
46 import { version } from './version'
47 import { WorkerNode } from './worker-node'
48
49 /**
50 * Base class that implements some shared logic for all poolifier pools.
51 *
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
55 */
56 export abstract class AbstractPool<
57 Worker extends IWorker,
58 Data = unknown,
59 Response = unknown
60 > implements IPool<Worker, Data, Response> {
61 /** @inheritDoc */
62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
63
64 /** @inheritDoc */
65 public readonly emitter?: PoolEmitter
66
67 /**
68 * The execution response promise map.
69 *
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
72 *
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
74 */
75 protected promiseResponseMap: Map<
76 string,
77 PromiseResponseWrapper<Worker, Response>
78 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
79
80 /**
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
84 Worker,
85 Data,
86 Response
87 >
88
89 /**
90 * Whether the pool is starting or not.
91 */
92 private readonly starting: boolean
93 /**
94 * The start timestamp of the pool.
95 */
96 private readonly startTimestamp
97
98 /**
99 * Constructs a new poolifier pool.
100 *
101 * @param numberOfWorkers - Number of workers that this pool should manage.
102 * @param filePath - Path to the worker file.
103 * @param opts - Options for the pool.
104 */
105 public constructor (
106 protected readonly numberOfWorkers: number,
107 protected readonly filePath: string,
108 protected readonly opts: PoolOptions<Worker>
109 ) {
110 if (!this.isMain()) {
111 throw new Error('Cannot start a pool from a worker!')
112 }
113 this.checkNumberOfWorkers(this.numberOfWorkers)
114 this.checkFilePath(this.filePath)
115 this.checkPoolOptions(this.opts)
116
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
120 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
121
122 if (this.opts.enableEvents === true) {
123 this.emitter = new PoolEmitter()
124 }
125 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
126 Worker,
127 Data,
128 Response
129 >(
130 this,
131 this.opts.workerChoiceStrategy,
132 this.opts.workerChoiceStrategyOptions
133 )
134
135 this.setupHook()
136
137 this.starting = true
138 this.startPool()
139 this.starting = false
140
141 this.startTimestamp = performance.now()
142 }
143
144 private checkFilePath (filePath: string): void {
145 if (
146 filePath == null ||
147 typeof filePath !== 'string' ||
148 (typeof filePath === 'string' && filePath.trim().length === 0)
149 ) {
150 throw new Error('Please specify a file with a worker implementation')
151 }
152 if (!existsSync(filePath)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
154 }
155 }
156
157 private checkNumberOfWorkers (numberOfWorkers: number): void {
158 if (numberOfWorkers == null) {
159 throw new Error(
160 'Cannot instantiate a pool without specifying the number of workers'
161 )
162 } else if (!Number.isSafeInteger(numberOfWorkers)) {
163 throw new TypeError(
164 'Cannot instantiate a pool with a non safe integer number of workers'
165 )
166 } else if (numberOfWorkers < 0) {
167 throw new RangeError(
168 'Cannot instantiate a pool with a negative number of workers'
169 )
170 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
172 }
173 }
174
175 protected checkDynamicPoolSize (min: number, max: number): void {
176 if (this.type === PoolTypes.dynamic) {
177 if (min > max) {
178 throw new RangeError(
179 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
180 )
181 } else if (max === 0) {
182 throw new RangeError(
183 'Cannot instantiate a dynamic pool with a pool size equal to zero'
184 )
185 } else if (min === max) {
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 )
189 }
190 }
191 }
192
193 private checkPoolOptions (opts: PoolOptions<Worker>): void {
194 if (isPlainObject(opts)) {
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
198 this.opts.workerChoiceStrategyOptions =
199 opts.workerChoiceStrategyOptions ??
200 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
201 this.checkValidWorkerChoiceStrategyOptions(
202 this.opts.workerChoiceStrategyOptions
203 )
204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
208 this.checkValidTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
212 opts.tasksQueueOptions as TasksQueueOptions
213 )
214 }
215 } else {
216 throw new TypeError('Invalid pool options: must be a plain object')
217 }
218 }
219
220 private checkValidWorkerChoiceStrategy (
221 workerChoiceStrategy: WorkerChoiceStrategy
222 ): void {
223 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
224 throw new Error(
225 `Invalid worker choice strategy '${workerChoiceStrategy}'`
226 )
227 }
228 }
229
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
232 ): void {
233 if (!isPlainObject(workerChoiceStrategyOptions)) {
234 throw new TypeError(
235 'Invalid worker choice strategy options: must be a plain object'
236 )
237 }
238 if (
239 workerChoiceStrategyOptions.weights != null &&
240 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
241 ) {
242 throw new Error(
243 'Invalid worker choice strategy options: must have a weight for each worker node'
244 )
245 }
246 if (
247 workerChoiceStrategyOptions.measurement != null &&
248 !Object.values(Measurements).includes(
249 workerChoiceStrategyOptions.measurement
250 )
251 ) {
252 throw new Error(
253 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
254 )
255 }
256 }
257
258 private checkValidTasksQueueOptions (
259 tasksQueueOptions: TasksQueueOptions
260 ): void {
261 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
262 throw new TypeError('Invalid tasks queue options: must be a plain object')
263 }
264 if (
265 tasksQueueOptions?.concurrency != null &&
266 !Number.isSafeInteger(tasksQueueOptions.concurrency)
267 ) {
268 throw new TypeError(
269 'Invalid worker tasks concurrency: must be an integer'
270 )
271 }
272 if (
273 tasksQueueOptions?.concurrency != null &&
274 tasksQueueOptions.concurrency <= 0
275 ) {
276 throw new Error(
277 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
278 )
279 }
280 }
281
282 private startPool (): void {
283 while (
284 this.workerNodes.reduce(
285 (accumulator, workerNode) =>
286 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
287 0
288 ) < this.numberOfWorkers
289 ) {
290 this.createAndSetupWorker()
291 }
292 }
293
294 /** @inheritDoc */
295 public get info (): PoolInfo {
296 return {
297 version,
298 type: this.type,
299 worker: this.worker,
300 ready: this.ready,
301 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
302 minSize: this.minSize,
303 maxSize: this.maxSize,
304 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
305 .runTime.aggregate &&
306 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
307 .waitTime.aggregate && { utilization: round(this.utilization) }),
308 workerNodes: this.workerNodes.length,
309 idleWorkerNodes: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 workerNode.usage.tasks.executing === 0
312 ? accumulator + 1
313 : accumulator,
314 0
315 ),
316 busyWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
319 0
320 ),
321 executedTasks: this.workerNodes.reduce(
322 (accumulator, workerNode) =>
323 accumulator + workerNode.usage.tasks.executed,
324 0
325 ),
326 executingTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.executing,
329 0
330 ),
331 queuedTasks: this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + workerNode.usage.tasks.queued,
334 0
335 ),
336 maxQueuedTasks: this.workerNodes.reduce(
337 (accumulator, workerNode) =>
338 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
339 0
340 ),
341 failedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.failed,
344 0
345 ),
346 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .runTime.aggregate && {
348 runTime: {
349 minimum: round(
350 Math.min(
351 ...this.workerNodes.map(
352 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
353 )
354 )
355 ),
356 maximum: round(
357 Math.max(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
360 )
361 )
362 ),
363 average: round(
364 this.workerNodes.reduce(
365 (accumulator, workerNode) =>
366 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
367 0
368 ) /
369 this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 accumulator + (workerNode.usage.tasks?.executed ?? 0),
372 0
373 )
374 ),
375 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
376 .runTime.median && {
377 median: round(
378 median(
379 this.workerNodes.map(
380 workerNode => workerNode.usage.runTime?.median ?? 0
381 )
382 )
383 )
384 })
385 }
386 }),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .waitTime.aggregate && {
389 waitTime: {
390 minimum: round(
391 Math.min(
392 ...this.workerNodes.map(
393 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
394 )
395 )
396 ),
397 maximum: round(
398 Math.max(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
401 )
402 )
403 ),
404 average: round(
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
408 0
409 ) /
410 this.workerNodes.reduce(
411 (accumulator, workerNode) =>
412 accumulator + (workerNode.usage.tasks?.executed ?? 0),
413 0
414 )
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .waitTime.median && {
418 median: round(
419 median(
420 this.workerNodes.map(
421 workerNode => workerNode.usage.waitTime?.median ?? 0
422 )
423 )
424 )
425 })
426 }
427 })
428 }
429 }
430
431 private get ready (): boolean {
432 return (
433 this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 !workerNode.info.dynamic && workerNode.info.ready
436 ? accumulator + 1
437 : accumulator,
438 0
439 ) >= this.minSize
440 )
441 }
442
443 /**
444 * Gets the approximate pool utilization.
445 *
446 * @returns The pool utilization.
447 */
448 private get utilization (): number {
449 const poolTimeCapacity =
450 (performance.now() - this.startTimestamp) * this.maxSize
451 const totalTasksRunTime = this.workerNodes.reduce(
452 (accumulator, workerNode) =>
453 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
454 0
455 )
456 const totalTasksWaitTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
458 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
459 0
460 )
461 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
462 }
463
464 /**
465 * Pool type.
466 *
467 * If it is `'dynamic'`, it provides the `max` property.
468 */
469 protected abstract get type (): PoolType
470
471 /**
472 * Gets the worker type.
473 */
474 protected abstract get worker (): WorkerType
475
476 /**
477 * Pool minimum size.
478 */
479 protected abstract get minSize (): number
480
481 /**
482 * Pool maximum size.
483 */
484 protected abstract get maxSize (): number
485
486 /**
487 * Get the worker given its id.
488 *
489 * @param workerId - The worker id.
490 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
491 */
492 private getWorkerById (workerId: number): Worker | undefined {
493 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
494 ?.worker
495 }
496
497 /**
498 * Checks if the worker id sent in the received message from a worker is valid.
499 *
500 * @param message - The received message.
501 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
502 */
503 private checkMessageWorkerId (message: MessageValue<Response>): void {
504 if (
505 message.workerId != null &&
506 this.getWorkerById(message.workerId) == null
507 ) {
508 throw new Error(
509 `Worker message received from unknown worker '${message.workerId}'`
510 )
511 }
512 }
513
514 /**
515 * Gets the given worker its worker node key.
516 *
517 * @param worker - The worker.
518 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
519 */
520 private getWorkerNodeKey (worker: Worker): number {
521 return this.workerNodes.findIndex(
522 workerNode => workerNode.worker === worker
523 )
524 }
525
526 /** @inheritDoc */
527 public setWorkerChoiceStrategy (
528 workerChoiceStrategy: WorkerChoiceStrategy,
529 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
530 ): void {
531 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
532 this.opts.workerChoiceStrategy = workerChoiceStrategy
533 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
534 this.opts.workerChoiceStrategy
535 )
536 if (workerChoiceStrategyOptions != null) {
537 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
538 }
539 for (const workerNode of this.workerNodes) {
540 workerNode.resetUsage()
541 this.setWorkerStatistics(workerNode.worker)
542 }
543 }
544
545 /** @inheritDoc */
546 public setWorkerChoiceStrategyOptions (
547 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
548 ): void {
549 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
550 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
551 this.workerChoiceStrategyContext.setOptions(
552 this.opts.workerChoiceStrategyOptions
553 )
554 }
555
556 /** @inheritDoc */
557 public enableTasksQueue (
558 enable: boolean,
559 tasksQueueOptions?: TasksQueueOptions
560 ): void {
561 if (this.opts.enableTasksQueue === true && !enable) {
562 this.flushTasksQueues()
563 }
564 this.opts.enableTasksQueue = enable
565 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
566 }
567
568 /** @inheritDoc */
569 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
570 if (this.opts.enableTasksQueue === true) {
571 this.checkValidTasksQueueOptions(tasksQueueOptions)
572 this.opts.tasksQueueOptions =
573 this.buildTasksQueueOptions(tasksQueueOptions)
574 } else if (this.opts.tasksQueueOptions != null) {
575 delete this.opts.tasksQueueOptions
576 }
577 }
578
579 private buildTasksQueueOptions (
580 tasksQueueOptions: TasksQueueOptions
581 ): TasksQueueOptions {
582 return {
583 concurrency: tasksQueueOptions?.concurrency ?? 1
584 }
585 }
586
587 /**
588 * Whether the pool is full or not.
589 *
590 * The pool filling boolean status.
591 */
592 protected get full (): boolean {
593 return this.workerNodes.length >= this.maxSize
594 }
595
596 /**
597 * Whether the pool is busy or not.
598 *
599 * The pool busyness boolean status.
600 */
601 protected abstract get busy (): boolean
602
603 /**
604 * Whether worker nodes are executing at least one task.
605 *
606 * @returns Worker nodes busyness boolean status.
607 */
608 protected internalBusy (): boolean {
609 return (
610 this.workerNodes.findIndex(workerNode => {
611 return workerNode.usage.tasks.executing === 0
612 }) === -1
613 )
614 }
615
616 /** @inheritDoc */
617 public async execute (data?: Data, name?: string): Promise<Response> {
618 const timestamp = performance.now()
619 const workerNodeKey = this.chooseWorkerNode()
620 const submittedTask: Task<Data> = {
621 name: name ?? DEFAULT_TASK_NAME,
622 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
623 data: data ?? ({} as Data),
624 timestamp,
625 workerId: this.getWorkerInfo(workerNodeKey).id as number,
626 id: randomUUID()
627 }
628 const res = new Promise<Response>((resolve, reject) => {
629 this.promiseResponseMap.set(submittedTask.id as string, {
630 resolve,
631 reject,
632 worker: this.workerNodes[workerNodeKey].worker
633 })
634 })
635 if (
636 this.opts.enableTasksQueue === true &&
637 (this.busy ||
638 this.workerNodes[workerNodeKey].usage.tasks.executing >=
639 ((this.opts.tasksQueueOptions as TasksQueueOptions)
640 .concurrency as number))
641 ) {
642 this.enqueueTask(workerNodeKey, submittedTask)
643 } else {
644 this.executeTask(workerNodeKey, submittedTask)
645 }
646 this.checkAndEmitEvents()
647 // eslint-disable-next-line @typescript-eslint/return-await
648 return res
649 }
650
651 /** @inheritDoc */
652 public async destroy (): Promise<void> {
653 await Promise.all(
654 this.workerNodes.map(async (workerNode, workerNodeKey) => {
655 this.flushTasksQueue(workerNodeKey)
656 // FIXME: wait for tasks to be finished
657 const workerExitPromise = new Promise<void>(resolve => {
658 workerNode.worker.on('exit', () => {
659 resolve()
660 })
661 })
662 await this.destroyWorker(workerNode.worker)
663 await workerExitPromise
664 })
665 )
666 }
667
668 /**
669 * Terminates the given worker.
670 *
671 * @param worker - A worker within `workerNodes`.
672 */
673 protected abstract destroyWorker (worker: Worker): void | Promise<void>
674
675 /**
676 * Setup hook to execute code before worker nodes are created in the abstract constructor.
677 * Can be overridden.
678 *
679 * @virtual
680 */
681 protected setupHook (): void {
682 // Intentionally empty
683 }
684
685 /**
686 * Should return whether the worker is the main worker or not.
687 */
688 protected abstract isMain (): boolean
689
690 /**
691 * Hook executed before the worker task execution.
692 * Can be overridden.
693 *
694 * @param workerNodeKey - The worker node key.
695 * @param task - The task to execute.
696 */
697 protected beforeTaskExecutionHook (
698 workerNodeKey: number,
699 task: Task<Data>
700 ): void {
701 const workerUsage = this.workerNodes[workerNodeKey].usage
702 ++workerUsage.tasks.executing
703 this.updateWaitTimeWorkerUsage(workerUsage, task)
704 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
705 task.name as string
706 ) as WorkerUsage
707 ++taskWorkerUsage.tasks.executing
708 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
709 }
710
711 /**
712 * Hook executed after the worker task execution.
713 * Can be overridden.
714 *
715 * @param worker - The worker.
716 * @param message - The received message.
717 */
718 protected afterTaskExecutionHook (
719 worker: Worker,
720 message: MessageValue<Response>
721 ): void {
722 const workerNodeKey = this.getWorkerNodeKey(worker)
723 const workerUsage = this.workerNodes[workerNodeKey].usage
724 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
725 this.updateRunTimeWorkerUsage(workerUsage, message)
726 this.updateEluWorkerUsage(workerUsage, message)
727 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
728 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
729 ) as WorkerUsage
730 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
731 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
732 this.updateEluWorkerUsage(taskWorkerUsage, message)
733 }
734
735 private updateTaskStatisticsWorkerUsage (
736 workerUsage: WorkerUsage,
737 message: MessageValue<Response>
738 ): void {
739 const workerTaskStatistics = workerUsage.tasks
740 --workerTaskStatistics.executing
741 if (message.taskError == null) {
742 ++workerTaskStatistics.executed
743 } else {
744 ++workerTaskStatistics.failed
745 }
746 }
747
748 private updateRunTimeWorkerUsage (
749 workerUsage: WorkerUsage,
750 message: MessageValue<Response>
751 ): void {
752 updateMeasurementStatistics(
753 workerUsage.runTime,
754 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
755 message.taskPerformance?.runTime ?? 0,
756 workerUsage.tasks.executed
757 )
758 }
759
760 private updateWaitTimeWorkerUsage (
761 workerUsage: WorkerUsage,
762 task: Task<Data>
763 ): void {
764 const timestamp = performance.now()
765 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
766 updateMeasurementStatistics(
767 workerUsage.waitTime,
768 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
769 taskWaitTime,
770 workerUsage.tasks.executed
771 )
772 }
773
774 private updateEluWorkerUsage (
775 workerUsage: WorkerUsage,
776 message: MessageValue<Response>
777 ): void {
778 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
779 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
780 updateMeasurementStatistics(
781 workerUsage.elu.active,
782 eluTaskStatisticsRequirements,
783 message.taskPerformance?.elu?.active ?? 0,
784 workerUsage.tasks.executed
785 )
786 updateMeasurementStatistics(
787 workerUsage.elu.idle,
788 eluTaskStatisticsRequirements,
789 message.taskPerformance?.elu?.idle ?? 0,
790 workerUsage.tasks.executed
791 )
792 if (eluTaskStatisticsRequirements.aggregate) {
793 if (message.taskPerformance?.elu != null) {
794 if (workerUsage.elu.utilization != null) {
795 workerUsage.elu.utilization =
796 (workerUsage.elu.utilization +
797 message.taskPerformance.elu.utilization) /
798 2
799 } else {
800 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
801 }
802 }
803 }
804 }
805
806 /**
807 * Chooses a worker node for the next task.
808 *
809 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
810 *
811 * @returns The worker node key
812 */
813 private chooseWorkerNode (): number {
814 if (this.shallCreateDynamicWorker()) {
815 const worker = this.createAndSetupDynamicWorker()
816 if (
817 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
818 ) {
819 return this.getWorkerNodeKey(worker)
820 }
821 }
822 return this.workerChoiceStrategyContext.execute()
823 }
824
825 /**
826 * Conditions for dynamic worker creation.
827 *
828 * @returns Whether to create a dynamic worker or not.
829 */
830 private shallCreateDynamicWorker (): boolean {
831 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
832 }
833
834 /**
835 * Sends a message to the given worker.
836 *
837 * @param worker - The worker which should receive the message.
838 * @param message - The message.
839 */
840 protected abstract sendToWorker (
841 worker: Worker,
842 message: MessageValue<Data>
843 ): void
844
845 /**
846 * Creates a new worker.
847 *
848 * @returns Newly created worker.
849 */
850 protected abstract createWorker (): Worker
851
852 /**
853 * Creates a new worker and sets it up completely in the pool worker nodes.
854 *
855 * @returns New, completely set up worker.
856 */
857 protected createAndSetupWorker (): Worker {
858 const worker = this.createWorker()
859
860 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
861 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
862 worker.on('error', error => {
863 const workerNodeKey = this.getWorkerNodeKey(worker)
864 const workerInfo = this.getWorkerInfo(workerNodeKey)
865 workerInfo.ready = false
866 this.emitter?.emit(PoolEvents.error, error)
867 if (this.opts.restartWorkerOnError === true && !this.starting) {
868 if (workerInfo.dynamic) {
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
873 }
874 if (this.opts.enableTasksQueue === true) {
875 this.redistributeQueuedTasks(workerNodeKey)
876 }
877 })
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
880 worker.once('exit', () => {
881 this.removeWorkerNode(worker)
882 })
883
884 this.addWorkerNode(worker)
885
886 this.afterWorkerSetup(worker)
887
888 return worker
889 }
890
891 /**
892 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
893 *
894 * @returns New, completely set up dynamic worker.
895 */
896 protected createAndSetupDynamicWorker (): Worker {
897 const worker = this.createAndSetupWorker()
898 this.registerWorkerMessageListener(worker, message => {
899 const workerNodeKey = this.getWorkerNodeKey(worker)
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
905 (this.opts.enableTasksQueue === true &&
906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
907 this.tasksQueueSize(workerNodeKey) === 0)))
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
913 const workerInfo = this.getWorkerInfoByWorker(worker)
914 workerInfo.dynamic = true
915 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
916 workerInfo.ready = true
917 }
918 this.sendToWorker(worker, {
919 checkActive: true,
920 workerId: workerInfo.id as number
921 })
922 return worker
923 }
924
925 /**
926 * Registers a listener callback on the given worker.
927 *
928 * @param worker - The worker which should register a listener.
929 * @param listener - The message listener callback.
930 */
931 private registerWorkerMessageListener<Message extends Data | Response>(
932 worker: Worker,
933 listener: (message: MessageValue<Message>) => void
934 ): void {
935 worker.on('message', listener as MessageHandler<Worker>)
936 }
937
938 /**
939 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
940 * Can be overridden.
941 *
942 * @param worker - The newly created worker.
943 */
944 protected afterWorkerSetup (worker: Worker): void {
945 // Listen to worker messages.
946 this.registerWorkerMessageListener(worker, this.workerListener())
947 // Send startup message to worker.
948 this.sendWorkerStartupMessage(worker)
949 // Setup worker task statistics computation.
950 this.setWorkerStatistics(worker)
951 }
952
953 private sendWorkerStartupMessage (worker: Worker): void {
954 this.sendToWorker(worker, {
955 ready: false,
956 workerId: this.getWorkerInfoByWorker(worker).id as number
957 })
958 }
959
960 private redistributeQueuedTasks (workerNodeKey: number): void {
961 while (this.tasksQueueSize(workerNodeKey) > 0) {
962 let targetWorkerNodeKey: number = workerNodeKey
963 let minQueuedTasks = Infinity
964 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
965 const workerInfo = this.getWorkerInfo(workerNodeId)
966 if (
967 workerNodeId !== workerNodeKey &&
968 workerInfo.ready &&
969 workerNode.usage.tasks.queued === 0
970 ) {
971 targetWorkerNodeKey = workerNodeId
972 break
973 }
974 if (
975 workerNodeId !== workerNodeKey &&
976 workerInfo.ready &&
977 workerNode.usage.tasks.queued < minQueuedTasks
978 ) {
979 minQueuedTasks = workerNode.usage.tasks.queued
980 targetWorkerNodeKey = workerNodeId
981 }
982 }
983 this.enqueueTask(
984 targetWorkerNodeKey,
985 this.dequeueTask(workerNodeKey) as Task<Data>
986 )
987 }
988 }
989
990 /**
991 * This function is the listener registered for each worker message.
992 *
993 * @returns The listener function to execute when a message is received from a worker.
994 */
995 protected workerListener (): (message: MessageValue<Response>) => void {
996 return message => {
997 this.checkMessageWorkerId(message)
998 if (message.ready != null) {
999 // Worker ready response received
1000 this.handleWorkerReadyResponse(message)
1001 } else if (message.id != null) {
1002 // Task execution response received
1003 this.handleTaskExecutionResponse(message)
1004 }
1005 }
1006 }
1007
1008 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
1009 this.getWorkerInfoByWorker(
1010 this.getWorkerById(message.workerId) as Worker
1011 ).ready = message.ready as boolean
1012 if (this.emitter != null && this.ready) {
1013 this.emitter.emit(PoolEvents.ready, this.info)
1014 }
1015 }
1016
1017 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1018 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1019 if (promiseResponse != null) {
1020 if (message.taskError != null) {
1021 this.emitter?.emit(PoolEvents.taskError, message.taskError)
1022 promiseResponse.reject(message.taskError.message)
1023 } else {
1024 promiseResponse.resolve(message.data as Response)
1025 }
1026 this.afterTaskExecutionHook(promiseResponse.worker, message)
1027 this.promiseResponseMap.delete(message.id as string)
1028 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1029 if (
1030 this.opts.enableTasksQueue === true &&
1031 this.tasksQueueSize(workerNodeKey) > 0
1032 ) {
1033 this.executeTask(
1034 workerNodeKey,
1035 this.dequeueTask(workerNodeKey) as Task<Data>
1036 )
1037 }
1038 this.workerChoiceStrategyContext.update(workerNodeKey)
1039 }
1040 }
1041
1042 private checkAndEmitEvents (): void {
1043 if (this.emitter != null) {
1044 if (this.busy) {
1045 this.emitter.emit(PoolEvents.busy, this.info)
1046 }
1047 if (this.type === PoolTypes.dynamic && this.full) {
1048 this.emitter.emit(PoolEvents.full, this.info)
1049 }
1050 }
1051 }
1052
1053 /**
1054 * Gets the worker information from the given worker node key.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 */
1058 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1059 return this.workerNodes[workerNodeKey].info
1060 }
1061
1062 /**
1063 * Gets the worker information from the given worker.
1064 *
1065 * @param worker - The worker.
1066 */
1067 private getWorkerInfoByWorker (worker: Worker): WorkerInfo {
1068 return this.workerNodes[this.getWorkerNodeKey(worker)].info
1069 }
1070
1071 /**
1072 * Adds the given worker in the pool worker nodes.
1073 *
1074 * @param worker - The worker.
1075 * @returns The worker nodes length.
1076 */
1077 private addWorkerNode (worker: Worker): number {
1078 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1079 // Flag the worker node as ready at pool startup.
1080 if (this.starting) {
1081 workerNode.info.ready = true
1082 }
1083 return this.workerNodes.push(workerNode)
1084 }
1085
1086 /**
1087 * Removes the given worker from the pool worker nodes.
1088 *
1089 * @param worker - The worker.
1090 */
1091 private removeWorkerNode (worker: Worker): void {
1092 const workerNodeKey = this.getWorkerNodeKey(worker)
1093 if (workerNodeKey !== -1) {
1094 this.workerNodes.splice(workerNodeKey, 1)
1095 this.workerChoiceStrategyContext.remove(workerNodeKey)
1096 }
1097 }
1098
1099 /**
1100 * Executes the given task on the given worker.
1101 *
1102 * @param worker - The worker.
1103 * @param task - The task to execute.
1104 */
1105 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1106 this.beforeTaskExecutionHook(workerNodeKey, task)
1107 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1108 }
1109
1110 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1111 return this.workerNodes[workerNodeKey].enqueueTask(task)
1112 }
1113
1114 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1115 return this.workerNodes[workerNodeKey].dequeueTask()
1116 }
1117
1118 private tasksQueueSize (workerNodeKey: number): number {
1119 return this.workerNodes[workerNodeKey].tasksQueueSize()
1120 }
1121
1122 private flushTasksQueue (workerNodeKey: number): void {
1123 while (this.tasksQueueSize(workerNodeKey) > 0) {
1124 this.executeTask(
1125 workerNodeKey,
1126 this.dequeueTask(workerNodeKey) as Task<Data>
1127 )
1128 }
1129 this.workerNodes[workerNodeKey].clearTasksQueue()
1130 }
1131
1132 private flushTasksQueues (): void {
1133 for (const [workerNodeKey] of this.workerNodes.entries()) {
1134 this.flushTasksQueue(workerNodeKey)
1135 }
1136 }
1137
1138 private setWorkerStatistics (worker: Worker): void {
1139 this.sendToWorker(worker, {
1140 statistics: {
1141 runTime:
1142 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1143 .runTime.aggregate,
1144 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1145 .elu.aggregate
1146 },
1147 workerId: this.getWorkerInfoByWorker(worker).id as number
1148 })
1149 }
1150 }