27d8af0f72713d6a5e224594f221d2f1b9591eec
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type {
4 MessageValue,
5 PromiseResponseWrapper,
6 Task
7 } from '../utility-types'
8 import {
9 DEFAULT_TASK_NAME,
10 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
11 EMPTY_FUNCTION,
12 isKillBehavior,
13 isPlainObject,
14 median,
15 round
16 } from '../utils'
17 import { KillBehaviors } from '../worker/worker-options'
18 import {
19 type IPool,
20 PoolEmitter,
21 PoolEvents,
22 type PoolInfo,
23 type PoolOptions,
24 type PoolType,
25 PoolTypes,
26 type TasksQueueOptions
27 } from './pool'
28 import type {
29 IWorker,
30 IWorkerNode,
31 MessageHandler,
32 WorkerInfo,
33 WorkerType,
34 WorkerUsage
35 } from './worker'
36 import {
37 Measurements,
38 WorkerChoiceStrategies,
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
41 } from './selection-strategies/selection-strategies-types'
42 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
43 import { version } from './version'
44 import { WorkerNode } from './worker-node'
45
46 /**
47 * Base class that implements some shared logic for all poolifier pools.
48 *
49 * @typeParam Worker - Type of worker which manages this pool.
50 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
51 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
52 */
53 export abstract class AbstractPool<
54 Worker extends IWorker,
55 Data = unknown,
56 Response = unknown
57 > implements IPool<Worker, Data, Response> {
58 /** @inheritDoc */
59 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
60
61 /** @inheritDoc */
62 public readonly emitter?: PoolEmitter
63
64 /**
65 * The execution response promise map.
66 *
67 * - `key`: The message id of each submitted task.
68 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
69 *
70 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
71 */
72 protected promiseResponseMap: Map<
73 string,
74 PromiseResponseWrapper<Worker, Response>
75 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
76
77 /**
78 * Worker choice strategy context referencing a worker choice algorithm implementation.
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
81 Worker,
82 Data,
83 Response
84 >
85
86 /**
87 * The start timestamp of the pool.
88 */
89 private readonly startTimestamp
90
91 /**
92 * Constructs a new poolifier pool.
93 *
94 * @param numberOfWorkers - Number of workers that this pool should manage.
95 * @param filePath - Path to the worker file.
96 * @param opts - Options for the pool.
97 */
98 public constructor (
99 protected readonly numberOfWorkers: number,
100 protected readonly filePath: string,
101 protected readonly opts: PoolOptions<Worker>
102 ) {
103 if (!this.isMain()) {
104 throw new Error('Cannot start a pool from a worker!')
105 }
106 this.checkNumberOfWorkers(this.numberOfWorkers)
107 this.checkFilePath(this.filePath)
108 this.checkPoolOptions(this.opts)
109
110 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
111 this.executeTask = this.executeTask.bind(this)
112 this.enqueueTask = this.enqueueTask.bind(this)
113 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
114
115 if (this.opts.enableEvents === true) {
116 this.emitter = new PoolEmitter()
117 }
118 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
119 Worker,
120 Data,
121 Response
122 >(
123 this,
124 this.opts.workerChoiceStrategy,
125 this.opts.workerChoiceStrategyOptions
126 )
127
128 this.setupHook()
129
130 while (this.workerNodes.length < this.numberOfWorkers) {
131 this.createAndSetupWorker()
132 }
133
134 this.startTimestamp = performance.now()
135 }
136
137 private checkFilePath (filePath: string): void {
138 if (
139 filePath == null ||
140 (typeof filePath === 'string' && filePath.trim().length === 0)
141 ) {
142 throw new Error('Please specify a file with a worker implementation')
143 }
144 }
145
146 private checkNumberOfWorkers (numberOfWorkers: number): void {
147 if (numberOfWorkers == null) {
148 throw new Error(
149 'Cannot instantiate a pool without specifying the number of workers'
150 )
151 } else if (!Number.isSafeInteger(numberOfWorkers)) {
152 throw new TypeError(
153 'Cannot instantiate a pool with a non safe integer number of workers'
154 )
155 } else if (numberOfWorkers < 0) {
156 throw new RangeError(
157 'Cannot instantiate a pool with a negative number of workers'
158 )
159 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
160 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
161 }
162 }
163
164 protected checkDynamicPoolSize (min: number, max: number): void {
165 if (this.type === PoolTypes.dynamic) {
166 if (min > max) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 )
170 } else if (min === 0 && max === 0) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
173 )
174 } else if (min === max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 )
178 }
179 }
180 }
181
182 private checkPoolOptions (opts: PoolOptions<Worker>): void {
183 if (isPlainObject(opts)) {
184 this.opts.workerChoiceStrategy =
185 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
186 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
187 this.opts.workerChoiceStrategyOptions =
188 opts.workerChoiceStrategyOptions ??
189 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
190 this.checkValidWorkerChoiceStrategyOptions(
191 this.opts.workerChoiceStrategyOptions
192 )
193 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
194 this.opts.enableEvents = opts.enableEvents ?? true
195 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
196 if (this.opts.enableTasksQueue) {
197 this.checkValidTasksQueueOptions(
198 opts.tasksQueueOptions as TasksQueueOptions
199 )
200 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
201 opts.tasksQueueOptions as TasksQueueOptions
202 )
203 }
204 } else {
205 throw new TypeError('Invalid pool options: must be a plain object')
206 }
207 }
208
209 private checkValidWorkerChoiceStrategy (
210 workerChoiceStrategy: WorkerChoiceStrategy
211 ): void {
212 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
213 throw new Error(
214 `Invalid worker choice strategy '${workerChoiceStrategy}'`
215 )
216 }
217 }
218
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
221 ): void {
222 if (!isPlainObject(workerChoiceStrategyOptions)) {
223 throw new TypeError(
224 'Invalid worker choice strategy options: must be a plain object'
225 )
226 }
227 if (
228 workerChoiceStrategyOptions.weights != null &&
229 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
230 ) {
231 throw new Error(
232 'Invalid worker choice strategy options: must have a weight for each worker node'
233 )
234 }
235 if (
236 workerChoiceStrategyOptions.measurement != null &&
237 !Object.values(Measurements).includes(
238 workerChoiceStrategyOptions.measurement
239 )
240 ) {
241 throw new Error(
242 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
243 )
244 }
245 }
246
247 private checkValidTasksQueueOptions (
248 tasksQueueOptions: TasksQueueOptions
249 ): void {
250 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
251 throw new TypeError('Invalid tasks queue options: must be a plain object')
252 }
253 if (
254 tasksQueueOptions?.concurrency != null &&
255 !Number.isSafeInteger(tasksQueueOptions.concurrency)
256 ) {
257 throw new TypeError(
258 'Invalid worker tasks concurrency: must be an integer'
259 )
260 }
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 tasksQueueOptions.concurrency <= 0
264 ) {
265 throw new Error(
266 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
267 )
268 }
269 }
270
271 /** @inheritDoc */
272 public get info (): PoolInfo {
273 return {
274 version,
275 type: this.type,
276 worker: this.worker,
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
279 minSize: this.minSize,
280 maxSize: this.maxSize,
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 workerNode.usage.tasks.executing === 0
289 ? accumulator + 1
290 : accumulator,
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
296 0
297 ),
298 executedTasks: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
300 accumulator + workerNode.usage.tasks.executed,
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.executing,
306 0
307 ),
308 queuedTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 accumulator + workerNode.usage.tasks.queued,
311 0
312 ),
313 maxQueuedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
316 0
317 ),
318 failedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.tasks.failed,
321 0
322 ),
323 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
324 .runTime.aggregate && {
325 runTime: {
326 minimum: round(
327 Math.min(
328 ...this.workerNodes.map(
329 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
330 )
331 )
332 ),
333 maximum: round(
334 Math.max(
335 ...this.workerNodes.map(
336 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
337 )
338 )
339 ),
340 average: round(
341 this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
344 0
345 ) /
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.tasks?.executed ?? 0),
349 0
350 )
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.median && {
354 median: round(
355 median(
356 this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.median ?? 0
358 )
359 )
360 )
361 })
362 }
363 }),
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && {
366 waitTime: {
367 minimum: round(
368 Math.min(
369 ...this.workerNodes.map(
370 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
371 )
372 )
373 ),
374 maximum: round(
375 Math.max(
376 ...this.workerNodes.map(
377 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
378 )
379 )
380 ),
381 average: round(
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
385 0
386 ) /
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.tasks?.executed ?? 0),
390 0
391 )
392 ),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.median && {
395 median: round(
396 median(
397 this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.median ?? 0
399 )
400 )
401 )
402 })
403 }
404 })
405 }
406 }
407
408 private get starting (): boolean {
409 return (
410 this.workerNodes.length < this.minSize ||
411 (this.workerNodes.length >= this.minSize &&
412 this.workerNodes.some(workerNode => !workerNode.info.ready))
413 )
414 }
415
416 private get ready (): boolean {
417 return (
418 this.workerNodes.length >= this.minSize &&
419 this.workerNodes.every(workerNode => workerNode.info.ready)
420 )
421 }
422
423 /**
424 * Gets the approximate pool utilization.
425 *
426 * @returns The pool utilization.
427 */
428 private get utilization (): number {
429 const poolTimeCapacity =
430 (performance.now() - this.startTimestamp) * this.maxSize
431 const totalTasksRunTime = this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
434 0
435 )
436 const totalTasksWaitTime = this.workerNodes.reduce(
437 (accumulator, workerNode) =>
438 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
439 0
440 )
441 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
442 }
443
444 /**
445 * Pool type.
446 *
447 * If it is `'dynamic'`, it provides the `max` property.
448 */
449 protected abstract get type (): PoolType
450
451 /**
452 * Gets the worker type.
453 */
454 protected abstract get worker (): WorkerType
455
456 /**
457 * Pool minimum size.
458 */
459 protected abstract get minSize (): number
460
461 /**
462 * Pool maximum size.
463 */
464 protected abstract get maxSize (): number
465
466 /**
467 * Get the worker given its id.
468 *
469 * @param workerId - The worker id.
470 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
471 */
472 private getWorkerById (workerId: number): Worker | undefined {
473 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
474 ?.worker
475 }
476
477 /**
478 * Checks if the worker id sent in the received message from a worker is valid.
479 *
480 * @param message - The received message.
481 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
482 */
483 private checkMessageWorkerId (message: MessageValue<Response>): void {
484 if (
485 message.workerId != null &&
486 this.getWorkerById(message.workerId) == null
487 ) {
488 throw new Error(
489 `Worker message received from unknown worker '${message.workerId}'`
490 )
491 }
492 }
493
494 /**
495 * Gets the given worker its worker node key.
496 *
497 * @param worker - The worker.
498 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
499 */
500 private getWorkerNodeKey (worker: Worker): number {
501 return this.workerNodes.findIndex(
502 workerNode => workerNode.worker === worker
503 )
504 }
505
506 /** @inheritDoc */
507 public setWorkerChoiceStrategy (
508 workerChoiceStrategy: WorkerChoiceStrategy,
509 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
510 ): void {
511 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
512 this.opts.workerChoiceStrategy = workerChoiceStrategy
513 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
514 this.opts.workerChoiceStrategy
515 )
516 if (workerChoiceStrategyOptions != null) {
517 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
518 }
519 for (const workerNode of this.workerNodes) {
520 workerNode.resetUsage()
521 this.setWorkerStatistics(workerNode.worker)
522 }
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategyOptions (
527 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
528 ): void {
529 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
530 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
531 this.workerChoiceStrategyContext.setOptions(
532 this.opts.workerChoiceStrategyOptions
533 )
534 }
535
536 /** @inheritDoc */
537 public enableTasksQueue (
538 enable: boolean,
539 tasksQueueOptions?: TasksQueueOptions
540 ): void {
541 if (this.opts.enableTasksQueue === true && !enable) {
542 this.flushTasksQueues()
543 }
544 this.opts.enableTasksQueue = enable
545 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
546 }
547
548 /** @inheritDoc */
549 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
550 if (this.opts.enableTasksQueue === true) {
551 this.checkValidTasksQueueOptions(tasksQueueOptions)
552 this.opts.tasksQueueOptions =
553 this.buildTasksQueueOptions(tasksQueueOptions)
554 } else if (this.opts.tasksQueueOptions != null) {
555 delete this.opts.tasksQueueOptions
556 }
557 }
558
559 private buildTasksQueueOptions (
560 tasksQueueOptions: TasksQueueOptions
561 ): TasksQueueOptions {
562 return {
563 concurrency: tasksQueueOptions?.concurrency ?? 1
564 }
565 }
566
567 /**
568 * Whether the pool is full or not.
569 *
570 * The pool filling boolean status.
571 */
572 protected get full (): boolean {
573 return this.workerNodes.length >= this.maxSize
574 }
575
576 /**
577 * Whether the pool is busy or not.
578 *
579 * The pool busyness boolean status.
580 */
581 protected abstract get busy (): boolean
582
583 /**
584 * Whether worker nodes are executing at least one task.
585 *
586 * @returns Worker nodes busyness boolean status.
587 */
588 protected internalBusy (): boolean {
589 return (
590 this.workerNodes.findIndex(workerNode => {
591 return workerNode.usage.tasks.executing === 0
592 }) === -1
593 )
594 }
595
596 /** @inheritDoc */
597 public async execute (data?: Data, name?: string): Promise<Response> {
598 const timestamp = performance.now()
599 const workerNodeKey = this.chooseWorkerNode()
600 const submittedTask: Task<Data> = {
601 name: name ?? DEFAULT_TASK_NAME,
602 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
603 data: data ?? ({} as Data),
604 timestamp,
605 workerId: this.getWorkerInfo(workerNodeKey).id as number,
606 id: randomUUID()
607 }
608 const res = new Promise<Response>((resolve, reject) => {
609 this.promiseResponseMap.set(submittedTask.id as string, {
610 resolve,
611 reject,
612 worker: this.workerNodes[workerNodeKey].worker
613 })
614 })
615 if (
616 this.opts.enableTasksQueue === true &&
617 (this.busy ||
618 this.workerNodes[workerNodeKey].usage.tasks.executing >=
619 ((this.opts.tasksQueueOptions as TasksQueueOptions)
620 .concurrency as number))
621 ) {
622 this.enqueueTask(workerNodeKey, submittedTask)
623 } else {
624 this.executeTask(workerNodeKey, submittedTask)
625 }
626 this.checkAndEmitEvents()
627 // eslint-disable-next-line @typescript-eslint/return-await
628 return res
629 }
630
631 /** @inheritDoc */
632 public async destroy (): Promise<void> {
633 await Promise.all(
634 this.workerNodes.map(async (workerNode, workerNodeKey) => {
635 this.flushTasksQueue(workerNodeKey)
636 // FIXME: wait for tasks to be finished
637 const workerExitPromise = new Promise<void>(resolve => {
638 workerNode.worker.on('exit', () => {
639 resolve()
640 })
641 })
642 await this.destroyWorker(workerNode.worker)
643 await workerExitPromise
644 })
645 )
646 }
647
648 /**
649 * Terminates the given worker.
650 *
651 * @param worker - A worker within `workerNodes`.
652 */
653 protected abstract destroyWorker (worker: Worker): void | Promise<void>
654
655 /**
656 * Setup hook to execute code before worker nodes are created in the abstract constructor.
657 * Can be overridden.
658 *
659 * @virtual
660 */
661 protected setupHook (): void {
662 // Intentionally empty
663 }
664
665 /**
666 * Should return whether the worker is the main worker or not.
667 */
668 protected abstract isMain (): boolean
669
670 /**
671 * Hook executed before the worker task execution.
672 * Can be overridden.
673 *
674 * @param workerNodeKey - The worker node key.
675 * @param task - The task to execute.
676 */
677 protected beforeTaskExecutionHook (
678 workerNodeKey: number,
679 task: Task<Data>
680 ): void {
681 const workerUsage = this.workerNodes[workerNodeKey].usage
682 ++workerUsage.tasks.executing
683 this.updateWaitTimeWorkerUsage(workerUsage, task)
684 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
685 task.name as string
686 ) as WorkerUsage
687 ++taskWorkerUsage.tasks.executing
688 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
689 }
690
691 /**
692 * Hook executed after the worker task execution.
693 * Can be overridden.
694 *
695 * @param worker - The worker.
696 * @param message - The received message.
697 */
698 protected afterTaskExecutionHook (
699 worker: Worker,
700 message: MessageValue<Response>
701 ): void {
702 const workerNodeKey = this.getWorkerNodeKey(worker)
703 const workerUsage = this.workerNodes[workerNodeKey].usage
704 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
705 this.updateRunTimeWorkerUsage(workerUsage, message)
706 this.updateEluWorkerUsage(workerUsage, message)
707 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
708 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
709 ) as WorkerUsage
710 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
711 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
712 this.updateEluWorkerUsage(taskWorkerUsage, message)
713 }
714
715 private updateTaskStatisticsWorkerUsage (
716 workerUsage: WorkerUsage,
717 message: MessageValue<Response>
718 ): void {
719 const workerTaskStatistics = workerUsage.tasks
720 --workerTaskStatistics.executing
721 if (message.taskError == null) {
722 ++workerTaskStatistics.executed
723 } else {
724 ++workerTaskStatistics.failed
725 }
726 }
727
728 private updateRunTimeWorkerUsage (
729 workerUsage: WorkerUsage,
730 message: MessageValue<Response>
731 ): void {
732 if (
733 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
734 .aggregate
735 ) {
736 const taskRunTime = message.taskPerformance?.runTime ?? 0
737 workerUsage.runTime.aggregate =
738 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
739 workerUsage.runTime.minimum = Math.min(
740 taskRunTime,
741 workerUsage.runTime?.minimum ?? Infinity
742 )
743 workerUsage.runTime.maximum = Math.max(
744 taskRunTime,
745 workerUsage.runTime?.maximum ?? -Infinity
746 )
747 if (
748 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
749 .average &&
750 workerUsage.tasks.executed !== 0
751 ) {
752 workerUsage.runTime.average =
753 workerUsage.runTime.aggregate / workerUsage.tasks.executed
754 }
755 if (
756 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
757 .median &&
758 message.taskPerformance?.runTime != null
759 ) {
760 workerUsage.runTime.history.push(message.taskPerformance.runTime)
761 workerUsage.runTime.median = median(workerUsage.runTime.history)
762 }
763 }
764 }
765
766 private updateWaitTimeWorkerUsage (
767 workerUsage: WorkerUsage,
768 task: Task<Data>
769 ): void {
770 const timestamp = performance.now()
771 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
772 if (
773 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
774 .aggregate
775 ) {
776 workerUsage.waitTime.aggregate =
777 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
778 workerUsage.waitTime.minimum = Math.min(
779 taskWaitTime,
780 workerUsage.waitTime?.minimum ?? Infinity
781 )
782 workerUsage.waitTime.maximum = Math.max(
783 taskWaitTime,
784 workerUsage.waitTime?.maximum ?? -Infinity
785 )
786 if (
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
788 .waitTime.average &&
789 workerUsage.tasks.executed !== 0
790 ) {
791 workerUsage.waitTime.average =
792 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
793 }
794 if (
795 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
796 .waitTime.median
797 ) {
798 workerUsage.waitTime.history.push(taskWaitTime)
799 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
800 }
801 }
802 }
803
804 private updateEluWorkerUsage (
805 workerUsage: WorkerUsage,
806 message: MessageValue<Response>
807 ): void {
808 if (
809 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
810 .aggregate
811 ) {
812 if (message.taskPerformance?.elu != null) {
813 workerUsage.elu.idle.aggregate =
814 (workerUsage.elu.idle?.aggregate ?? 0) +
815 message.taskPerformance.elu.idle
816 workerUsage.elu.active.aggregate =
817 (workerUsage.elu.active?.aggregate ?? 0) +
818 message.taskPerformance.elu.active
819 if (workerUsage.elu.utilization != null) {
820 workerUsage.elu.utilization =
821 (workerUsage.elu.utilization +
822 message.taskPerformance.elu.utilization) /
823 2
824 } else {
825 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
826 }
827 workerUsage.elu.idle.minimum = Math.min(
828 message.taskPerformance.elu.idle,
829 workerUsage.elu.idle?.minimum ?? Infinity
830 )
831 workerUsage.elu.idle.maximum = Math.max(
832 message.taskPerformance.elu.idle,
833 workerUsage.elu.idle?.maximum ?? -Infinity
834 )
835 workerUsage.elu.active.minimum = Math.min(
836 message.taskPerformance.elu.active,
837 workerUsage.elu.active?.minimum ?? Infinity
838 )
839 workerUsage.elu.active.maximum = Math.max(
840 message.taskPerformance.elu.active,
841 workerUsage.elu.active?.maximum ?? -Infinity
842 )
843 if (
844 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
845 .average &&
846 workerUsage.tasks.executed !== 0
847 ) {
848 workerUsage.elu.idle.average =
849 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
850 workerUsage.elu.active.average =
851 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
852 }
853 if (
854 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
855 .median
856 ) {
857 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
858 workerUsage.elu.active.history.push(
859 message.taskPerformance.elu.active
860 )
861 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
862 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
863 }
864 }
865 }
866 }
867
868 /**
869 * Chooses a worker node for the next task.
870 *
871 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
872 *
873 * @returns The worker node key
874 */
875 private chooseWorkerNode (): number {
876 if (this.shallCreateDynamicWorker()) {
877 const worker = this.createAndSetupDynamicWorker()
878 if (
879 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
880 ) {
881 return this.getWorkerNodeKey(worker)
882 }
883 }
884 return this.workerChoiceStrategyContext.execute()
885 }
886
887 /**
888 * Conditions for dynamic worker creation.
889 *
890 * @returns Whether to create a dynamic worker or not.
891 */
892 private shallCreateDynamicWorker (): boolean {
893 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
894 }
895
896 /**
897 * Sends a message to the given worker.
898 *
899 * @param worker - The worker which should receive the message.
900 * @param message - The message.
901 */
902 protected abstract sendToWorker (
903 worker: Worker,
904 message: MessageValue<Data>
905 ): void
906
907 /**
908 * Creates a new worker.
909 *
910 * @returns Newly created worker.
911 */
912 protected abstract createWorker (): Worker
913
914 /**
915 * Creates a new worker and sets it up completely in the pool worker nodes.
916 *
917 * @returns New, completely set up worker.
918 */
919 protected createAndSetupWorker (): Worker {
920 const worker = this.createWorker()
921
922 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
923 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
924 worker.on('error', error => {
925 const workerNodeKey = this.getWorkerNodeKey(worker)
926 const workerInfo = this.getWorkerInfo(workerNodeKey)
927 workerInfo.ready = false
928 if (this.emitter != null) {
929 this.emitter.emit(PoolEvents.error, error)
930 }
931 if (this.opts.restartWorkerOnError === true && !this.starting) {
932 if (workerInfo.dynamic) {
933 this.createAndSetupDynamicWorker()
934 } else {
935 this.createAndSetupWorker()
936 }
937 }
938 if (this.opts.enableTasksQueue === true) {
939 this.redistributeQueuedTasks(workerNodeKey)
940 }
941 })
942 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
943 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
944 worker.once('exit', () => {
945 this.removeWorkerNode(worker)
946 })
947
948 this.pushWorkerNode(worker)
949
950 this.afterWorkerSetup(worker)
951
952 return worker
953 }
954
955 /**
956 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
957 *
958 * @returns New, completely set up dynamic worker.
959 */
960 protected createAndSetupDynamicWorker (): Worker {
961 const worker = this.createAndSetupWorker()
962 this.registerWorkerMessageListener(worker, message => {
963 const workerNodeKey = this.getWorkerNodeKey(worker)
964 if (
965 isKillBehavior(KillBehaviors.HARD, message.kill) ||
966 (message.kill != null &&
967 ((this.opts.enableTasksQueue === false &&
968 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
969 (this.opts.enableTasksQueue === true &&
970 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
971 this.tasksQueueSize(workerNodeKey) === 0)))
972 ) {
973 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
974 void (this.destroyWorker(worker) as Promise<void>)
975 }
976 })
977 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
978 workerInfo.dynamic = true
979 this.sendToWorker(worker, {
980 checkAlive: true,
981 workerId: workerInfo.id as number
982 })
983 return worker
984 }
985
986 /**
987 * Registers a listener callback on the given worker.
988 *
989 * @param worker - The worker which should register a listener.
990 * @param listener - The message listener callback.
991 */
992 private registerWorkerMessageListener<Message extends Data | Response>(
993 worker: Worker,
994 listener: (message: MessageValue<Message>) => void
995 ): void {
996 worker.on('message', listener as MessageHandler<Worker>)
997 }
998
999 /**
1000 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1001 * Can be overridden.
1002 *
1003 * @param worker - The newly created worker.
1004 */
1005 protected afterWorkerSetup (worker: Worker): void {
1006 // Listen to worker messages.
1007 this.registerWorkerMessageListener(worker, this.workerListener())
1008 // Send startup message to worker.
1009 this.sendToWorker(worker, {
1010 ready: false,
1011 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1012 })
1013 // Setup worker task statistics computation.
1014 this.setWorkerStatistics(worker)
1015 }
1016
1017 private redistributeQueuedTasks (workerNodeKey: number): void {
1018 while (this.tasksQueueSize(workerNodeKey) > 0) {
1019 let targetWorkerNodeKey: number = workerNodeKey
1020 let minQueuedTasks = Infinity
1021 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1022 const workerInfo = this.getWorkerInfo(workerNodeId)
1023 if (
1024 workerNodeId !== workerNodeKey &&
1025 workerInfo.ready &&
1026 workerNode.usage.tasks.queued === 0
1027 ) {
1028 targetWorkerNodeKey = workerNodeId
1029 break
1030 }
1031 if (
1032 workerNodeId !== workerNodeKey &&
1033 workerInfo.ready &&
1034 workerNode.usage.tasks.queued < minQueuedTasks
1035 ) {
1036 minQueuedTasks = workerNode.usage.tasks.queued
1037 targetWorkerNodeKey = workerNodeId
1038 }
1039 }
1040 this.enqueueTask(
1041 targetWorkerNodeKey,
1042 this.dequeueTask(workerNodeKey) as Task<Data>
1043 )
1044 }
1045 }
1046
1047 /**
1048 * This function is the listener registered for each worker message.
1049 *
1050 * @returns The listener function to execute when a message is received from a worker.
1051 */
1052 protected workerListener (): (message: MessageValue<Response>) => void {
1053 return message => {
1054 this.checkMessageWorkerId(message)
1055 if (message.ready != null && message.workerId != null) {
1056 // Worker ready message received
1057 this.handleWorkerReadyMessage(message)
1058 } else if (message.id != null) {
1059 // Task execution response received
1060 this.handleTaskExecutionResponse(message)
1061 }
1062 }
1063 }
1064
1065 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1066 const worker = this.getWorkerById(message.workerId)
1067 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1068 message.ready as boolean
1069 if (this.emitter != null && this.ready) {
1070 this.emitter.emit(PoolEvents.ready, this.info)
1071 }
1072 }
1073
1074 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1075 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1076 if (promiseResponse != null) {
1077 if (message.taskError != null) {
1078 if (this.emitter != null) {
1079 this.emitter.emit(PoolEvents.taskError, message.taskError)
1080 }
1081 promiseResponse.reject(message.taskError.message)
1082 } else {
1083 promiseResponse.resolve(message.data as Response)
1084 }
1085 this.afterTaskExecutionHook(promiseResponse.worker, message)
1086 this.promiseResponseMap.delete(message.id as string)
1087 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1088 if (
1089 this.opts.enableTasksQueue === true &&
1090 this.tasksQueueSize(workerNodeKey) > 0
1091 ) {
1092 this.executeTask(
1093 workerNodeKey,
1094 this.dequeueTask(workerNodeKey) as Task<Data>
1095 )
1096 }
1097 this.workerChoiceStrategyContext.update(workerNodeKey)
1098 }
1099 }
1100
1101 private checkAndEmitEvents (): void {
1102 if (this.emitter != null) {
1103 if (this.busy) {
1104 this.emitter.emit(PoolEvents.busy, this.info)
1105 }
1106 if (this.type === PoolTypes.dynamic && this.full) {
1107 this.emitter.emit(PoolEvents.full, this.info)
1108 }
1109 }
1110 }
1111
1112 /**
1113 * Gets the worker information.
1114 *
1115 * @param workerNodeKey - The worker node key.
1116 */
1117 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1118 return this.workerNodes[workerNodeKey].info
1119 }
1120
1121 /**
1122 * Pushes the given worker in the pool worker nodes.
1123 *
1124 * @param worker - The worker.
1125 * @returns The worker nodes length.
1126 */
1127 private pushWorkerNode (worker: Worker): number {
1128 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1129 }
1130
1131 /**
1132 * Removes the given worker from the pool worker nodes.
1133 *
1134 * @param worker - The worker.
1135 */
1136 private removeWorkerNode (worker: Worker): void {
1137 const workerNodeKey = this.getWorkerNodeKey(worker)
1138 if (workerNodeKey !== -1) {
1139 this.workerNodes.splice(workerNodeKey, 1)
1140 this.workerChoiceStrategyContext.remove(workerNodeKey)
1141 }
1142 }
1143
1144 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1145 this.beforeTaskExecutionHook(workerNodeKey, task)
1146 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1147 }
1148
1149 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1150 return this.workerNodes[workerNodeKey].enqueueTask(task)
1151 }
1152
1153 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1154 return this.workerNodes[workerNodeKey].dequeueTask()
1155 }
1156
1157 private tasksQueueSize (workerNodeKey: number): number {
1158 return this.workerNodes[workerNodeKey].tasksQueueSize()
1159 }
1160
1161 private flushTasksQueue (workerNodeKey: number): void {
1162 while (this.tasksQueueSize(workerNodeKey) > 0) {
1163 this.executeTask(
1164 workerNodeKey,
1165 this.dequeueTask(workerNodeKey) as Task<Data>
1166 )
1167 }
1168 this.workerNodes[workerNodeKey].clearTasksQueue()
1169 }
1170
1171 private flushTasksQueues (): void {
1172 for (const [workerNodeKey] of this.workerNodes.entries()) {
1173 this.flushTasksQueue(workerNodeKey)
1174 }
1175 }
1176
1177 private setWorkerStatistics (worker: Worker): void {
1178 this.sendToWorker(worker, {
1179 statistics: {
1180 runTime:
1181 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1182 .runTime.aggregate,
1183 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1184 .elu.aggregate
1185 },
1186 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1187 })
1188 }
1189 }