Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_TASK_NAME,
6 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
7 EMPTY_FUNCTION,
8 isKillBehavior,
9 isPlainObject,
10 median,
11 round
12 } from '../utils'
13 import { KillBehaviors } from '../worker/worker-options'
14 import {
15 type IPool,
16 PoolEmitter,
17 PoolEvents,
18 type PoolInfo,
19 type PoolOptions,
20 type PoolType,
21 PoolTypes,
22 type TasksQueueOptions
23 } from './pool'
24 import type {
25 IWorker,
26 IWorkerNode,
27 MessageHandler,
28 Task,
29 WorkerInfo,
30 WorkerType,
31 WorkerUsage
32 } from './worker'
33 import {
34 Measurements,
35 WorkerChoiceStrategies,
36 type WorkerChoiceStrategy,
37 type WorkerChoiceStrategyOptions
38 } from './selection-strategies/selection-strategies-types'
39 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
40 import { version } from './version'
41 import { WorkerNode } from './worker-node'
42
43 /**
44 * Base class that implements some shared logic for all poolifier pools.
45 *
46 * @typeParam Worker - Type of worker which manages this pool.
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
49 */
50 export abstract class AbstractPool<
51 Worker extends IWorker,
52 Data = unknown,
53 Response = unknown
54 > implements IPool<Worker, Data, Response> {
55 /** @inheritDoc */
56 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
57
58 /** @inheritDoc */
59 public readonly emitter?: PoolEmitter
60
61 /**
62 * The execution response promise map.
63 *
64 * - `key`: The message id of each submitted task.
65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
66 *
67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
68 */
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
73
74 /**
75 * Worker choice strategy context referencing a worker choice algorithm implementation.
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78 Worker,
79 Data,
80 Response
81 >
82
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
88 /**
89 * Constructs a new poolifier pool.
90 *
91 * @param numberOfWorkers - Number of workers that this pool should manage.
92 * @param filePath - Path to the worker file.
93 * @param opts - Options for the pool.
94 */
95 public constructor (
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
99 ) {
100 if (!this.isMain()) {
101 throw new Error('Cannot start a pool from a worker!')
102 }
103 this.checkNumberOfWorkers(this.numberOfWorkers)
104 this.checkFilePath(this.filePath)
105 this.checkPoolOptions(this.opts)
106
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
111
112 if (this.opts.enableEvents === true) {
113 this.emitter = new PoolEmitter()
114 }
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
124
125 this.setupHook()
126
127 while (this.workerNodes.length < this.numberOfWorkers) {
128 this.createAndSetupWorker()
129 }
130
131 this.startTimestamp = performance.now()
132 }
133
134 private checkFilePath (filePath: string): void {
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
149 throw new TypeError(
150 'Cannot instantiate a pool with a non safe integer number of workers'
151 )
152 } else if (numberOfWorkers < 0) {
153 throw new RangeError(
154 'Cannot instantiate a pool with a negative number of workers'
155 )
156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
157 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
158 }
159 }
160
161 protected checkDynamicPoolSize (min: number, max: number): void {
162 if (this.type === PoolTypes.dynamic) {
163 if (min > max) {
164 throw new RangeError(
165 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
166 )
167 } else if (min === 0 && max === 0) {
168 throw new RangeError(
169 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
170 )
171 } else if (min === max) {
172 throw new RangeError(
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
174 )
175 }
176 }
177 }
178
179 private checkPoolOptions (opts: PoolOptions<Worker>): void {
180 if (isPlainObject(opts)) {
181 this.opts.workerChoiceStrategy =
182 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
183 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
184 this.opts.workerChoiceStrategyOptions =
185 opts.workerChoiceStrategyOptions ??
186 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
187 this.checkValidWorkerChoiceStrategyOptions(
188 this.opts.workerChoiceStrategyOptions
189 )
190 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
191 this.opts.enableEvents = opts.enableEvents ?? true
192 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
193 if (this.opts.enableTasksQueue) {
194 this.checkValidTasksQueueOptions(
195 opts.tasksQueueOptions as TasksQueueOptions
196 )
197 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
198 opts.tasksQueueOptions as TasksQueueOptions
199 )
200 }
201 } else {
202 throw new TypeError('Invalid pool options: must be a plain object')
203 }
204 }
205
206 private checkValidWorkerChoiceStrategy (
207 workerChoiceStrategy: WorkerChoiceStrategy
208 ): void {
209 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
210 throw new Error(
211 `Invalid worker choice strategy '${workerChoiceStrategy}'`
212 )
213 }
214 }
215
216 private checkValidWorkerChoiceStrategyOptions (
217 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
218 ): void {
219 if (!isPlainObject(workerChoiceStrategyOptions)) {
220 throw new TypeError(
221 'Invalid worker choice strategy options: must be a plain object'
222 )
223 }
224 if (
225 workerChoiceStrategyOptions.weights != null &&
226 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
227 ) {
228 throw new Error(
229 'Invalid worker choice strategy options: must have a weight for each worker node'
230 )
231 }
232 if (
233 workerChoiceStrategyOptions.measurement != null &&
234 !Object.values(Measurements).includes(
235 workerChoiceStrategyOptions.measurement
236 )
237 ) {
238 throw new Error(
239 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
240 )
241 }
242 }
243
244 private checkValidTasksQueueOptions (
245 tasksQueueOptions: TasksQueueOptions
246 ): void {
247 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
248 throw new TypeError('Invalid tasks queue options: must be a plain object')
249 }
250 if (
251 tasksQueueOptions?.concurrency != null &&
252 !Number.isSafeInteger(tasksQueueOptions.concurrency)
253 ) {
254 throw new TypeError(
255 'Invalid worker tasks concurrency: must be an integer'
256 )
257 }
258 if (
259 tasksQueueOptions?.concurrency != null &&
260 tasksQueueOptions.concurrency <= 0
261 ) {
262 throw new Error(
263 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
264 )
265 }
266 }
267
268 /** @inheritDoc */
269 public get info (): PoolInfo {
270 return {
271 version,
272 type: this.type,
273 worker: this.worker,
274 ready: this.ready,
275 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
276 minSize: this.minSize,
277 maxSize: this.maxSize,
278 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
279 .runTime.aggregate &&
280 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
281 .waitTime.aggregate && { utilization: round(this.utilization) }),
282 workerNodes: this.workerNodes.length,
283 idleWorkerNodes: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 workerNode.usage.tasks.executing === 0
286 ? accumulator + 1
287 : accumulator,
288 0
289 ),
290 busyWorkerNodes: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
293 0
294 ),
295 executedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 accumulator + workerNode.usage.tasks.executed,
298 0
299 ),
300 executingTasks: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
302 accumulator + workerNode.usage.tasks.executing,
303 0
304 ),
305 queuedTasks: this.workerNodes.reduce(
306 (accumulator, workerNode) =>
307 accumulator + workerNode.usage.tasks.queued,
308 0
309 ),
310 maxQueuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
313 0
314 ),
315 failedTasks: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 accumulator + workerNode.usage.tasks.failed,
318 0
319 ),
320 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
321 .runTime.aggregate && {
322 runTime: {
323 minimum: round(
324 Math.min(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
327 )
328 )
329 ),
330 maximum: round(
331 Math.max(
332 ...this.workerNodes.map(
333 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
334 )
335 )
336 ),
337 average: round(
338 this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
341 0
342 ) /
343 this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + (workerNode.usage.tasks?.executed ?? 0),
346 0
347 )
348 ),
349 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
350 .runTime.median && {
351 median: round(
352 median(
353 this.workerNodes.map(
354 workerNode => workerNode.usage.runTime?.median ?? 0
355 )
356 )
357 )
358 })
359 }
360 }),
361 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .waitTime.aggregate && {
363 waitTime: {
364 minimum: round(
365 Math.min(
366 ...this.workerNodes.map(
367 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
368 )
369 )
370 ),
371 maximum: round(
372 Math.max(
373 ...this.workerNodes.map(
374 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
375 )
376 )
377 ),
378 average: round(
379 this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
382 0
383 ) /
384 this.workerNodes.reduce(
385 (accumulator, workerNode) =>
386 accumulator + (workerNode.usage.tasks?.executed ?? 0),
387 0
388 )
389 ),
390 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
391 .waitTime.median && {
392 median: round(
393 median(
394 this.workerNodes.map(
395 workerNode => workerNode.usage.waitTime?.median ?? 0
396 )
397 )
398 )
399 })
400 }
401 })
402 }
403 }
404
405 private get starting (): boolean {
406 return (
407 this.workerNodes.length < this.minSize ||
408 (this.workerNodes.length >= this.minSize &&
409 this.workerNodes.some(workerNode => !workerNode.info.ready))
410 )
411 }
412
413 private get ready (): boolean {
414 return (
415 this.workerNodes.length >= this.minSize &&
416 this.workerNodes.every(workerNode => workerNode.info.ready)
417 )
418 }
419
420 /**
421 * Gets the approximate pool utilization.
422 *
423 * @returns The pool utilization.
424 */
425 private get utilization (): number {
426 const poolRunTimeCapacity =
427 (performance.now() - this.startTimestamp) * this.maxSize
428 const totalTasksRunTime = this.workerNodes.reduce(
429 (accumulator, workerNode) =>
430 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
431 0
432 )
433 const totalTasksWaitTime = this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
436 0
437 )
438 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
439 }
440
441 /**
442 * Pool type.
443 *
444 * If it is `'dynamic'`, it provides the `max` property.
445 */
446 protected abstract get type (): PoolType
447
448 /**
449 * Gets the worker type.
450 */
451 protected abstract get worker (): WorkerType
452
453 /**
454 * Pool minimum size.
455 */
456 protected abstract get minSize (): number
457
458 /**
459 * Pool maximum size.
460 */
461 protected abstract get maxSize (): number
462
463 /**
464 * Get the worker given its id.
465 *
466 * @param workerId - The worker id.
467 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
468 */
469 private getWorkerById (workerId: number): Worker | undefined {
470 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
471 ?.worker
472 }
473
474 private checkMessageWorkerId (message: MessageValue<Response>): void {
475 if (
476 message.workerId != null &&
477 this.getWorkerById(message.workerId) == null
478 ) {
479 throw new Error(
480 `Worker message received from unknown worker '${message.workerId}'`
481 )
482 }
483 }
484
485 /**
486 * Gets the given worker its worker node key.
487 *
488 * @param worker - The worker.
489 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
490 */
491 private getWorkerNodeKey (worker: Worker): number {
492 return this.workerNodes.findIndex(
493 workerNode => workerNode.worker === worker
494 )
495 }
496
497 /** @inheritDoc */
498 public setWorkerChoiceStrategy (
499 workerChoiceStrategy: WorkerChoiceStrategy,
500 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
501 ): void {
502 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
503 this.opts.workerChoiceStrategy = workerChoiceStrategy
504 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
505 this.opts.workerChoiceStrategy
506 )
507 if (workerChoiceStrategyOptions != null) {
508 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
509 }
510 for (const workerNode of this.workerNodes) {
511 workerNode.resetUsage()
512 this.setWorkerStatistics(workerNode.worker)
513 }
514 }
515
516 /** @inheritDoc */
517 public setWorkerChoiceStrategyOptions (
518 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
519 ): void {
520 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
521 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
522 this.workerChoiceStrategyContext.setOptions(
523 this.opts.workerChoiceStrategyOptions
524 )
525 }
526
527 /** @inheritDoc */
528 public enableTasksQueue (
529 enable: boolean,
530 tasksQueueOptions?: TasksQueueOptions
531 ): void {
532 if (this.opts.enableTasksQueue === true && !enable) {
533 this.flushTasksQueues()
534 }
535 this.opts.enableTasksQueue = enable
536 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
537 }
538
539 /** @inheritDoc */
540 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
541 if (this.opts.enableTasksQueue === true) {
542 this.checkValidTasksQueueOptions(tasksQueueOptions)
543 this.opts.tasksQueueOptions =
544 this.buildTasksQueueOptions(tasksQueueOptions)
545 } else if (this.opts.tasksQueueOptions != null) {
546 delete this.opts.tasksQueueOptions
547 }
548 }
549
550 private buildTasksQueueOptions (
551 tasksQueueOptions: TasksQueueOptions
552 ): TasksQueueOptions {
553 return {
554 concurrency: tasksQueueOptions?.concurrency ?? 1
555 }
556 }
557
558 /**
559 * Whether the pool is full or not.
560 *
561 * The pool filling boolean status.
562 */
563 protected get full (): boolean {
564 return this.workerNodes.length >= this.maxSize
565 }
566
567 /**
568 * Whether the pool is busy or not.
569 *
570 * The pool busyness boolean status.
571 */
572 protected abstract get busy (): boolean
573
574 /**
575 * Whether worker nodes are executing at least one task.
576 *
577 * @returns Worker nodes busyness boolean status.
578 */
579 protected internalBusy (): boolean {
580 return (
581 this.workerNodes.findIndex(workerNode => {
582 return workerNode.usage.tasks.executing === 0
583 }) === -1
584 )
585 }
586
587 /** @inheritDoc */
588 public async execute (data?: Data, name?: string): Promise<Response> {
589 const timestamp = performance.now()
590 const workerNodeKey = this.chooseWorkerNode()
591 const submittedTask: Task<Data> = {
592 name: name ?? DEFAULT_TASK_NAME,
593 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
594 data: data ?? ({} as Data),
595 timestamp,
596 workerId: this.getWorkerInfo(workerNodeKey).id as number,
597 id: randomUUID()
598 }
599 const res = new Promise<Response>((resolve, reject) => {
600 this.promiseResponseMap.set(submittedTask.id as string, {
601 resolve,
602 reject,
603 worker: this.workerNodes[workerNodeKey].worker
604 })
605 })
606 if (
607 this.opts.enableTasksQueue === true &&
608 (this.busy ||
609 this.workerNodes[workerNodeKey].usage.tasks.executing >=
610 ((this.opts.tasksQueueOptions as TasksQueueOptions)
611 .concurrency as number))
612 ) {
613 this.enqueueTask(workerNodeKey, submittedTask)
614 } else {
615 this.executeTask(workerNodeKey, submittedTask)
616 }
617 this.checkAndEmitEvents()
618 // eslint-disable-next-line @typescript-eslint/return-await
619 return res
620 }
621
622 /** @inheritDoc */
623 public async destroy (): Promise<void> {
624 await Promise.all(
625 this.workerNodes.map(async (workerNode, workerNodeKey) => {
626 this.flushTasksQueue(workerNodeKey)
627 // FIXME: wait for tasks to be finished
628 const workerExitPromise = new Promise<void>(resolve => {
629 workerNode.worker.on('exit', () => {
630 resolve()
631 })
632 })
633 await this.destroyWorker(workerNode.worker)
634 await workerExitPromise
635 })
636 )
637 }
638
639 /**
640 * Terminates the given worker.
641 *
642 * @param worker - A worker within `workerNodes`.
643 */
644 protected abstract destroyWorker (worker: Worker): void | Promise<void>
645
646 /**
647 * Setup hook to execute code before worker nodes are created in the abstract constructor.
648 * Can be overridden.
649 *
650 * @virtual
651 */
652 protected setupHook (): void {
653 // Intentionally empty
654 }
655
656 /**
657 * Should return whether the worker is the main worker or not.
658 */
659 protected abstract isMain (): boolean
660
661 /**
662 * Hook executed before the worker task execution.
663 * Can be overridden.
664 *
665 * @param workerNodeKey - The worker node key.
666 * @param task - The task to execute.
667 */
668 protected beforeTaskExecutionHook (
669 workerNodeKey: number,
670 task: Task<Data>
671 ): void {
672 const workerUsage = this.workerNodes[workerNodeKey].usage
673 ++workerUsage.tasks.executing
674 this.updateWaitTimeWorkerUsage(workerUsage, task)
675 const tasksWorkerUsage = this.workerNodes[
676 workerNodeKey
677 ].getTasksWorkerUsage(task.name as string) as WorkerUsage
678 ++tasksWorkerUsage.tasks.executing
679 this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task)
680 }
681
682 /**
683 * Hook executed after the worker task execution.
684 * Can be overridden.
685 *
686 * @param worker - The worker.
687 * @param message - The received message.
688 */
689 protected afterTaskExecutionHook (
690 worker: Worker,
691 message: MessageValue<Response>
692 ): void {
693 const workerNodeKey = this.getWorkerNodeKey(worker)
694 const workerUsage = this.workerNodes[workerNodeKey].usage
695 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
696 this.updateRunTimeWorkerUsage(workerUsage, message)
697 this.updateEluWorkerUsage(workerUsage, message)
698 const tasksWorkerUsage = this.workerNodes[
699 workerNodeKey
700 ].getTasksWorkerUsage(message.name as string) as WorkerUsage
701 this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message)
702 this.updateRunTimeWorkerUsage(tasksWorkerUsage, message)
703 this.updateEluWorkerUsage(tasksWorkerUsage, message)
704 }
705
706 private updateTaskStatisticsWorkerUsage (
707 workerUsage: WorkerUsage,
708 message: MessageValue<Response>
709 ): void {
710 const workerTaskStatistics = workerUsage.tasks
711 --workerTaskStatistics.executing
712 if (message.taskError == null) {
713 ++workerTaskStatistics.executed
714 } else {
715 ++workerTaskStatistics.failed
716 }
717 }
718
719 private updateRunTimeWorkerUsage (
720 workerUsage: WorkerUsage,
721 message: MessageValue<Response>
722 ): void {
723 if (
724 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
725 .aggregate
726 ) {
727 const taskRunTime = message.taskPerformance?.runTime ?? 0
728 workerUsage.runTime.aggregate =
729 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
730 workerUsage.runTime.minimum = Math.min(
731 taskRunTime,
732 workerUsage.runTime?.minimum ?? Infinity
733 )
734 workerUsage.runTime.maximum = Math.max(
735 taskRunTime,
736 workerUsage.runTime?.maximum ?? -Infinity
737 )
738 if (
739 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
740 .average &&
741 workerUsage.tasks.executed !== 0
742 ) {
743 workerUsage.runTime.average =
744 workerUsage.runTime.aggregate / workerUsage.tasks.executed
745 }
746 if (
747 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
748 .median &&
749 message.taskPerformance?.runTime != null
750 ) {
751 workerUsage.runTime.history.push(message.taskPerformance.runTime)
752 workerUsage.runTime.median = median(workerUsage.runTime.history)
753 }
754 }
755 }
756
757 private updateWaitTimeWorkerUsage (
758 workerUsage: WorkerUsage,
759 task: Task<Data>
760 ): void {
761 const timestamp = performance.now()
762 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
763 if (
764 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
765 .aggregate
766 ) {
767 workerUsage.waitTime.aggregate =
768 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
769 workerUsage.waitTime.minimum = Math.min(
770 taskWaitTime,
771 workerUsage.waitTime?.minimum ?? Infinity
772 )
773 workerUsage.waitTime.maximum = Math.max(
774 taskWaitTime,
775 workerUsage.waitTime?.maximum ?? -Infinity
776 )
777 if (
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
779 .waitTime.average &&
780 workerUsage.tasks.executed !== 0
781 ) {
782 workerUsage.waitTime.average =
783 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
784 }
785 if (
786 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
787 .waitTime.median
788 ) {
789 workerUsage.waitTime.history.push(taskWaitTime)
790 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
791 }
792 }
793 }
794
795 private updateEluWorkerUsage (
796 workerUsage: WorkerUsage,
797 message: MessageValue<Response>
798 ): void {
799 if (
800 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
801 .aggregate
802 ) {
803 if (message.taskPerformance?.elu != null) {
804 workerUsage.elu.idle.aggregate =
805 (workerUsage.elu.idle?.aggregate ?? 0) +
806 message.taskPerformance.elu.idle
807 workerUsage.elu.active.aggregate =
808 (workerUsage.elu.active?.aggregate ?? 0) +
809 message.taskPerformance.elu.active
810 if (workerUsage.elu.utilization != null) {
811 workerUsage.elu.utilization =
812 (workerUsage.elu.utilization +
813 message.taskPerformance.elu.utilization) /
814 2
815 } else {
816 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
817 }
818 workerUsage.elu.idle.minimum = Math.min(
819 message.taskPerformance.elu.idle,
820 workerUsage.elu.idle?.minimum ?? Infinity
821 )
822 workerUsage.elu.idle.maximum = Math.max(
823 message.taskPerformance.elu.idle,
824 workerUsage.elu.idle?.maximum ?? -Infinity
825 )
826 workerUsage.elu.active.minimum = Math.min(
827 message.taskPerformance.elu.active,
828 workerUsage.elu.active?.minimum ?? Infinity
829 )
830 workerUsage.elu.active.maximum = Math.max(
831 message.taskPerformance.elu.active,
832 workerUsage.elu.active?.maximum ?? -Infinity
833 )
834 if (
835 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
836 .average &&
837 workerUsage.tasks.executed !== 0
838 ) {
839 workerUsage.elu.idle.average =
840 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
841 workerUsage.elu.active.average =
842 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
843 }
844 if (
845 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
846 .median
847 ) {
848 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
849 workerUsage.elu.active.history.push(
850 message.taskPerformance.elu.active
851 )
852 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
853 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
854 }
855 }
856 }
857 }
858
859 /**
860 * Chooses a worker node for the next task.
861 *
862 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
863 *
864 * @returns The worker node key
865 */
866 private chooseWorkerNode (): number {
867 if (this.shallCreateDynamicWorker()) {
868 const worker = this.createAndSetupDynamicWorker()
869 if (
870 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
871 ) {
872 return this.getWorkerNodeKey(worker)
873 }
874 }
875 return this.workerChoiceStrategyContext.execute()
876 }
877
878 /**
879 * Conditions for dynamic worker creation.
880 *
881 * @returns Whether to create a dynamic worker or not.
882 */
883 private shallCreateDynamicWorker (): boolean {
884 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
885 }
886
887 /**
888 * Sends a message to the given worker.
889 *
890 * @param worker - The worker which should receive the message.
891 * @param message - The message.
892 */
893 protected abstract sendToWorker (
894 worker: Worker,
895 message: MessageValue<Data>
896 ): void
897
898 /**
899 * Registers a listener callback on the given worker.
900 *
901 * @param worker - The worker which should register a listener.
902 * @param listener - The message listener callback.
903 */
904 private registerWorkerMessageListener<Message extends Data | Response>(
905 worker: Worker,
906 listener: (message: MessageValue<Message>) => void
907 ): void {
908 worker.on('message', listener as MessageHandler<Worker>)
909 }
910
911 /**
912 * Creates a new worker.
913 *
914 * @returns Newly created worker.
915 */
916 protected abstract createWorker (): Worker
917
918 /**
919 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
920 * Can be overridden.
921 *
922 * @param worker - The newly created worker.
923 */
924 protected afterWorkerSetup (worker: Worker): void {
925 // Listen to worker messages.
926 this.registerWorkerMessageListener(worker, this.workerListener())
927 // Send startup message to worker.
928 this.sendToWorker(worker, {
929 ready: false,
930 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
931 })
932 // Setup worker task statistics computation.
933 this.setWorkerStatistics(worker)
934 }
935
936 /**
937 * Creates a new worker and sets it up completely in the pool worker nodes.
938 *
939 * @returns New, completely set up worker.
940 */
941 protected createAndSetupWorker (): Worker {
942 const worker = this.createWorker()
943
944 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
945 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
946 worker.on('error', error => {
947 if (this.emitter != null) {
948 this.emitter.emit(PoolEvents.error, error)
949 }
950 if (this.opts.enableTasksQueue === true) {
951 this.redistributeQueuedTasks(worker)
952 }
953 if (this.opts.restartWorkerOnError === true && !this.starting) {
954 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
955 this.createAndSetupDynamicWorker()
956 } else {
957 this.createAndSetupWorker()
958 }
959 }
960 })
961 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
962 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
963 worker.once('exit', () => {
964 this.removeWorkerNode(worker)
965 })
966
967 this.pushWorkerNode(worker)
968
969 this.afterWorkerSetup(worker)
970
971 return worker
972 }
973
974 private redistributeQueuedTasks (worker: Worker): void {
975 const workerNodeKey = this.getWorkerNodeKey(worker)
976 while (this.tasksQueueSize(workerNodeKey) > 0) {
977 let targetWorkerNodeKey: number = workerNodeKey
978 let minQueuedTasks = Infinity
979 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
980 if (
981 workerNodeId !== workerNodeKey &&
982 workerNode.usage.tasks.queued === 0
983 ) {
984 targetWorkerNodeKey = workerNodeId
985 break
986 }
987 if (
988 workerNodeId !== workerNodeKey &&
989 workerNode.usage.tasks.queued < minQueuedTasks
990 ) {
991 minQueuedTasks = workerNode.usage.tasks.queued
992 targetWorkerNodeKey = workerNodeId
993 }
994 }
995 this.enqueueTask(
996 targetWorkerNodeKey,
997 this.dequeueTask(workerNodeKey) as Task<Data>
998 )
999 }
1000 }
1001
1002 /**
1003 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
1004 *
1005 * @returns New, completely set up dynamic worker.
1006 */
1007 protected createAndSetupDynamicWorker (): Worker {
1008 const worker = this.createAndSetupWorker()
1009 this.registerWorkerMessageListener(worker, message => {
1010 const workerNodeKey = this.getWorkerNodeKey(worker)
1011 if (
1012 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1013 (message.kill != null &&
1014 ((this.opts.enableTasksQueue === false &&
1015 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
1016 (this.opts.enableTasksQueue === true &&
1017 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
1018 this.tasksQueueSize(workerNodeKey) === 0)))
1019 ) {
1020 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
1021 void (this.destroyWorker(worker) as Promise<void>)
1022 }
1023 })
1024 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1025 workerInfo.dynamic = true
1026 this.sendToWorker(worker, {
1027 checkAlive: true,
1028 workerId: workerInfo.id as number
1029 })
1030 return worker
1031 }
1032
1033 /**
1034 * This function is the listener registered for each worker message.
1035 *
1036 * @returns The listener function to execute when a message is received from a worker.
1037 */
1038 protected workerListener (): (message: MessageValue<Response>) => void {
1039 return message => {
1040 this.checkMessageWorkerId(message)
1041 if (message.ready != null && message.workerId != null) {
1042 // Worker ready message received
1043 this.handleWorkerReadyMessage(message)
1044 } else if (message.id != null) {
1045 // Task execution response received
1046 this.handleTaskExecutionResponse(message)
1047 }
1048 }
1049 }
1050
1051 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1052 const worker = this.getWorkerById(message.workerId)
1053 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1054 message.ready as boolean
1055 if (this.emitter != null && this.ready) {
1056 this.emitter.emit(PoolEvents.ready, this.info)
1057 }
1058 }
1059
1060 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1061 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1062 if (promiseResponse != null) {
1063 if (message.taskError != null) {
1064 if (this.emitter != null) {
1065 this.emitter.emit(PoolEvents.taskError, message.taskError)
1066 }
1067 promiseResponse.reject(message.taskError.message)
1068 } else {
1069 promiseResponse.resolve(message.data as Response)
1070 }
1071 this.afterTaskExecutionHook(promiseResponse.worker, message)
1072 this.promiseResponseMap.delete(message.id as string)
1073 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1074 if (
1075 this.opts.enableTasksQueue === true &&
1076 this.tasksQueueSize(workerNodeKey) > 0
1077 ) {
1078 this.executeTask(
1079 workerNodeKey,
1080 this.dequeueTask(workerNodeKey) as Task<Data>
1081 )
1082 }
1083 this.workerChoiceStrategyContext.update(workerNodeKey)
1084 }
1085 }
1086
1087 private checkAndEmitEvents (): void {
1088 if (this.emitter != null) {
1089 if (this.busy) {
1090 this.emitter.emit(PoolEvents.busy, this.info)
1091 }
1092 if (this.type === PoolTypes.dynamic && this.full) {
1093 this.emitter.emit(PoolEvents.full, this.info)
1094 }
1095 }
1096 }
1097
1098 /**
1099 * Gets the worker information.
1100 *
1101 * @param workerNodeKey - The worker node key.
1102 */
1103 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1104 return this.workerNodes[workerNodeKey].info
1105 }
1106
1107 /**
1108 * Pushes the given worker in the pool worker nodes.
1109 *
1110 * @param worker - The worker.
1111 * @returns The worker nodes length.
1112 */
1113 private pushWorkerNode (worker: Worker): number {
1114 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1115 }
1116
1117 /**
1118 * Removes the given worker from the pool worker nodes.
1119 *
1120 * @param worker - The worker.
1121 */
1122 private removeWorkerNode (worker: Worker): void {
1123 const workerNodeKey = this.getWorkerNodeKey(worker)
1124 if (workerNodeKey !== -1) {
1125 this.workerNodes.splice(workerNodeKey, 1)
1126 this.workerChoiceStrategyContext.remove(workerNodeKey)
1127 }
1128 }
1129
1130 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1131 this.beforeTaskExecutionHook(workerNodeKey, task)
1132 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1133 }
1134
1135 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1136 return this.workerNodes[workerNodeKey].enqueueTask(task)
1137 }
1138
1139 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1140 return this.workerNodes[workerNodeKey].dequeueTask()
1141 }
1142
1143 private tasksQueueSize (workerNodeKey: number): number {
1144 return this.workerNodes[workerNodeKey].tasksQueueSize()
1145 }
1146
1147 private flushTasksQueue (workerNodeKey: number): void {
1148 while (this.tasksQueueSize(workerNodeKey) > 0) {
1149 this.executeTask(
1150 workerNodeKey,
1151 this.dequeueTask(workerNodeKey) as Task<Data>
1152 )
1153 }
1154 this.workerNodes[workerNodeKey].clearTasksQueue()
1155 }
1156
1157 private flushTasksQueues (): void {
1158 for (const [workerNodeKey] of this.workerNodes.entries()) {
1159 this.flushTasksQueue(workerNodeKey)
1160 }
1161 }
1162
1163 private setWorkerStatistics (worker: Worker): void {
1164 this.sendToWorker(worker, {
1165 statistics: {
1166 runTime:
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1168 .runTime.aggregate,
1169 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1170 .elu.aggregate
1171 },
1172 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1173 })
1174 }
1175 }