fix: test for worker file existence
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round
17 } from '../utils'
18 import { KillBehaviors } from '../worker/worker-options'
19 import {
20 type IPool,
21 PoolEmitter,
22 PoolEvents,
23 type PoolInfo,
24 type PoolOptions,
25 type PoolType,
26 PoolTypes,
27 type TasksQueueOptions
28 } from './pool'
29 import type {
30 IWorker,
31 IWorkerNode,
32 MessageHandler,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 Measurements,
39 WorkerChoiceStrategies,
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
42 } from './selection-strategies/selection-strategies-types'
43 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
44 import { version } from './version'
45 import { WorkerNode } from './worker-node'
46
47 /**
48 * Base class that implements some shared logic for all poolifier pools.
49 *
50 * @typeParam Worker - Type of worker which manages this pool.
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
53 */
54 export abstract class AbstractPool<
55 Worker extends IWorker,
56 Data = unknown,
57 Response = unknown
58 > implements IPool<Worker, Data, Response> {
59 /** @inheritDoc */
60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
61
62 /** @inheritDoc */
63 public readonly emitter?: PoolEmitter
64
65 /**
66 * The execution response promise map.
67 *
68 * - `key`: The message id of each submitted task.
69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
70 *
71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
72 */
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * The start timestamp of the pool.
89 */
90 private readonly startTimestamp
91
92 /**
93 * Constructs a new poolifier pool.
94 *
95 * @param numberOfWorkers - Number of workers that this pool should manage.
96 * @param filePath - Path to the worker file.
97 * @param opts - Options for the pool.
98 */
99 public constructor (
100 protected readonly numberOfWorkers: number,
101 protected readonly filePath: string,
102 protected readonly opts: PoolOptions<Worker>
103 ) {
104 if (!this.isMain()) {
105 throw new Error('Cannot start a pool from a worker!')
106 }
107 this.checkNumberOfWorkers(this.numberOfWorkers)
108 this.checkFilePath(this.filePath)
109 this.checkPoolOptions(this.opts)
110
111 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
112 this.executeTask = this.executeTask.bind(this)
113 this.enqueueTask = this.enqueueTask.bind(this)
114 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
115
116 if (this.opts.enableEvents === true) {
117 this.emitter = new PoolEmitter()
118 }
119 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
120 Worker,
121 Data,
122 Response
123 >(
124 this,
125 this.opts.workerChoiceStrategy,
126 this.opts.workerChoiceStrategyOptions
127 )
128
129 this.setupHook()
130
131 while (this.workerNodes.length < this.numberOfWorkers) {
132 this.createAndSetupWorker()
133 }
134
135 this.startTimestamp = performance.now()
136 }
137
138 private checkFilePath (filePath: string): void {
139 if (
140 filePath == null ||
141 typeof filePath !== 'string' ||
142 (typeof filePath === 'string' && filePath.trim().length === 0)
143 ) {
144 throw new Error('Please specify a file with a worker implementation')
145 }
146 if (!existsSync(filePath)) {
147 throw new Error(`Cannot find the worker file '${filePath}'`)
148 }
149 }
150
151 private checkNumberOfWorkers (numberOfWorkers: number): void {
152 if (numberOfWorkers == null) {
153 throw new Error(
154 'Cannot instantiate a pool without specifying the number of workers'
155 )
156 } else if (!Number.isSafeInteger(numberOfWorkers)) {
157 throw new TypeError(
158 'Cannot instantiate a pool with a non safe integer number of workers'
159 )
160 } else if (numberOfWorkers < 0) {
161 throw new RangeError(
162 'Cannot instantiate a pool with a negative number of workers'
163 )
164 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
165 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
166 }
167 }
168
169 protected checkDynamicPoolSize (min: number, max: number): void {
170 if (this.type === PoolTypes.dynamic) {
171 if (min > max) {
172 throw new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 } else if (min === 0 && max === 0) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
178 )
179 } else if (min === max) {
180 throw new RangeError(
181 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
182 )
183 }
184 }
185 }
186
187 private checkPoolOptions (opts: PoolOptions<Worker>): void {
188 if (isPlainObject(opts)) {
189 this.opts.workerChoiceStrategy =
190 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
191 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
192 this.opts.workerChoiceStrategyOptions =
193 opts.workerChoiceStrategyOptions ??
194 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
195 this.checkValidWorkerChoiceStrategyOptions(
196 this.opts.workerChoiceStrategyOptions
197 )
198 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
199 this.opts.enableEvents = opts.enableEvents ?? true
200 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
201 if (this.opts.enableTasksQueue) {
202 this.checkValidTasksQueueOptions(
203 opts.tasksQueueOptions as TasksQueueOptions
204 )
205 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 }
209 } else {
210 throw new TypeError('Invalid pool options: must be a plain object')
211 }
212 }
213
214 private checkValidWorkerChoiceStrategy (
215 workerChoiceStrategy: WorkerChoiceStrategy
216 ): void {
217 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
218 throw new Error(
219 `Invalid worker choice strategy '${workerChoiceStrategy}'`
220 )
221 }
222 }
223
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
226 ): void {
227 if (!isPlainObject(workerChoiceStrategyOptions)) {
228 throw new TypeError(
229 'Invalid worker choice strategy options: must be a plain object'
230 )
231 }
232 if (
233 workerChoiceStrategyOptions.weights != null &&
234 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
235 ) {
236 throw new Error(
237 'Invalid worker choice strategy options: must have a weight for each worker node'
238 )
239 }
240 if (
241 workerChoiceStrategyOptions.measurement != null &&
242 !Object.values(Measurements).includes(
243 workerChoiceStrategyOptions.measurement
244 )
245 ) {
246 throw new Error(
247 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
248 )
249 }
250 }
251
252 private checkValidTasksQueueOptions (
253 tasksQueueOptions: TasksQueueOptions
254 ): void {
255 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
256 throw new TypeError('Invalid tasks queue options: must be a plain object')
257 }
258 if (
259 tasksQueueOptions?.concurrency != null &&
260 !Number.isSafeInteger(tasksQueueOptions.concurrency)
261 ) {
262 throw new TypeError(
263 'Invalid worker tasks concurrency: must be an integer'
264 )
265 }
266 if (
267 tasksQueueOptions?.concurrency != null &&
268 tasksQueueOptions.concurrency <= 0
269 ) {
270 throw new Error(
271 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
272 )
273 }
274 }
275
276 /** @inheritDoc */
277 public get info (): PoolInfo {
278 return {
279 version,
280 type: this.type,
281 worker: this.worker,
282 ready: this.ready,
283 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
284 minSize: this.minSize,
285 maxSize: this.maxSize,
286 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
287 .runTime.aggregate &&
288 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
289 .waitTime.aggregate && { utilization: round(this.utilization) }),
290 workerNodes: this.workerNodes.length,
291 idleWorkerNodes: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 workerNode.usage.tasks.executing === 0
294 ? accumulator + 1
295 : accumulator,
296 0
297 ),
298 busyWorkerNodes: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
300 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
301 0
302 ),
303 executedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.executed,
306 0
307 ),
308 executingTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 accumulator + workerNode.usage.tasks.executing,
311 0
312 ),
313 queuedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + workerNode.usage.tasks.queued,
316 0
317 ),
318 maxQueuedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
321 0
322 ),
323 failedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.failed,
326 0
327 ),
328 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
329 .runTime.aggregate && {
330 runTime: {
331 minimum: round(
332 Math.min(
333 ...this.workerNodes.map(
334 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
335 )
336 )
337 ),
338 maximum: round(
339 Math.max(
340 ...this.workerNodes.map(
341 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
342 )
343 )
344 ),
345 average: round(
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
349 0
350 ) /
351 this.workerNodes.reduce(
352 (accumulator, workerNode) =>
353 accumulator + (workerNode.usage.tasks?.executed ?? 0),
354 0
355 )
356 ),
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.median && {
359 median: round(
360 median(
361 this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.median ?? 0
363 )
364 )
365 )
366 })
367 }
368 }),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .waitTime.aggregate && {
371 waitTime: {
372 minimum: round(
373 Math.min(
374 ...this.workerNodes.map(
375 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
376 )
377 )
378 ),
379 maximum: round(
380 Math.max(
381 ...this.workerNodes.map(
382 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
383 )
384 )
385 ),
386 average: round(
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
390 0
391 ) /
392 this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.executed ?? 0),
395 0
396 )
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.median && {
400 median: round(
401 median(
402 this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.median ?? 0
404 )
405 )
406 )
407 })
408 }
409 })
410 }
411 }
412
413 private get starting (): boolean {
414 return (
415 this.workerNodes.length < this.minSize ||
416 (this.workerNodes.length >= this.minSize &&
417 this.workerNodes.some(workerNode => !workerNode.info.ready))
418 )
419 }
420
421 private get ready (): boolean {
422 return (
423 this.workerNodes.length >= this.minSize &&
424 this.workerNodes.every(workerNode => workerNode.info.ready)
425 )
426 }
427
428 /**
429 * Gets the approximate pool utilization.
430 *
431 * @returns The pool utilization.
432 */
433 private get utilization (): number {
434 const poolTimeCapacity =
435 (performance.now() - this.startTimestamp) * this.maxSize
436 const totalTasksRunTime = this.workerNodes.reduce(
437 (accumulator, workerNode) =>
438 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
439 0
440 )
441 const totalTasksWaitTime = this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 )
446 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
447 }
448
449 /**
450 * Pool type.
451 *
452 * If it is `'dynamic'`, it provides the `max` property.
453 */
454 protected abstract get type (): PoolType
455
456 /**
457 * Gets the worker type.
458 */
459 protected abstract get worker (): WorkerType
460
461 /**
462 * Pool minimum size.
463 */
464 protected abstract get minSize (): number
465
466 /**
467 * Pool maximum size.
468 */
469 protected abstract get maxSize (): number
470
471 /**
472 * Get the worker given its id.
473 *
474 * @param workerId - The worker id.
475 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
476 */
477 private getWorkerById (workerId: number): Worker | undefined {
478 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
479 ?.worker
480 }
481
482 /**
483 * Checks if the worker id sent in the received message from a worker is valid.
484 *
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
487 */
488 private checkMessageWorkerId (message: MessageValue<Response>): void {
489 if (
490 message.workerId != null &&
491 this.getWorkerById(message.workerId) == null
492 ) {
493 throw new Error(
494 `Worker message received from unknown worker '${message.workerId}'`
495 )
496 }
497 }
498
499 /**
500 * Gets the given worker its worker node key.
501 *
502 * @param worker - The worker.
503 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
504 */
505 private getWorkerNodeKey (worker: Worker): number {
506 return this.workerNodes.findIndex(
507 workerNode => workerNode.worker === worker
508 )
509 }
510
511 /** @inheritDoc */
512 public setWorkerChoiceStrategy (
513 workerChoiceStrategy: WorkerChoiceStrategy,
514 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
515 ): void {
516 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
517 this.opts.workerChoiceStrategy = workerChoiceStrategy
518 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
519 this.opts.workerChoiceStrategy
520 )
521 if (workerChoiceStrategyOptions != null) {
522 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
523 }
524 for (const workerNode of this.workerNodes) {
525 workerNode.resetUsage()
526 this.setWorkerStatistics(workerNode.worker)
527 }
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategyOptions (
532 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
533 ): void {
534 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
535 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
536 this.workerChoiceStrategyContext.setOptions(
537 this.opts.workerChoiceStrategyOptions
538 )
539 }
540
541 /** @inheritDoc */
542 public enableTasksQueue (
543 enable: boolean,
544 tasksQueueOptions?: TasksQueueOptions
545 ): void {
546 if (this.opts.enableTasksQueue === true && !enable) {
547 this.flushTasksQueues()
548 }
549 this.opts.enableTasksQueue = enable
550 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
551 }
552
553 /** @inheritDoc */
554 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
555 if (this.opts.enableTasksQueue === true) {
556 this.checkValidTasksQueueOptions(tasksQueueOptions)
557 this.opts.tasksQueueOptions =
558 this.buildTasksQueueOptions(tasksQueueOptions)
559 } else if (this.opts.tasksQueueOptions != null) {
560 delete this.opts.tasksQueueOptions
561 }
562 }
563
564 private buildTasksQueueOptions (
565 tasksQueueOptions: TasksQueueOptions
566 ): TasksQueueOptions {
567 return {
568 concurrency: tasksQueueOptions?.concurrency ?? 1
569 }
570 }
571
572 /**
573 * Whether the pool is full or not.
574 *
575 * The pool filling boolean status.
576 */
577 protected get full (): boolean {
578 return this.workerNodes.length >= this.maxSize
579 }
580
581 /**
582 * Whether the pool is busy or not.
583 *
584 * The pool busyness boolean status.
585 */
586 protected abstract get busy (): boolean
587
588 /**
589 * Whether worker nodes are executing at least one task.
590 *
591 * @returns Worker nodes busyness boolean status.
592 */
593 protected internalBusy (): boolean {
594 return (
595 this.workerNodes.findIndex(workerNode => {
596 return workerNode.usage.tasks.executing === 0
597 }) === -1
598 )
599 }
600
601 /** @inheritDoc */
602 public async execute (data?: Data, name?: string): Promise<Response> {
603 const timestamp = performance.now()
604 const workerNodeKey = this.chooseWorkerNode()
605 const submittedTask: Task<Data> = {
606 name: name ?? DEFAULT_TASK_NAME,
607 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
608 data: data ?? ({} as Data),
609 timestamp,
610 workerId: this.getWorkerInfo(workerNodeKey).id as number,
611 id: randomUUID()
612 }
613 const res = new Promise<Response>((resolve, reject) => {
614 this.promiseResponseMap.set(submittedTask.id as string, {
615 resolve,
616 reject,
617 worker: this.workerNodes[workerNodeKey].worker
618 })
619 })
620 if (
621 this.opts.enableTasksQueue === true &&
622 (this.busy ||
623 this.workerNodes[workerNodeKey].usage.tasks.executing >=
624 ((this.opts.tasksQueueOptions as TasksQueueOptions)
625 .concurrency as number))
626 ) {
627 this.enqueueTask(workerNodeKey, submittedTask)
628 } else {
629 this.executeTask(workerNodeKey, submittedTask)
630 }
631 this.checkAndEmitEvents()
632 // eslint-disable-next-line @typescript-eslint/return-await
633 return res
634 }
635
636 /** @inheritDoc */
637 public async destroy (): Promise<void> {
638 await Promise.all(
639 this.workerNodes.map(async (workerNode, workerNodeKey) => {
640 this.flushTasksQueue(workerNodeKey)
641 // FIXME: wait for tasks to be finished
642 const workerExitPromise = new Promise<void>(resolve => {
643 workerNode.worker.on('exit', () => {
644 resolve()
645 })
646 })
647 await this.destroyWorker(workerNode.worker)
648 await workerExitPromise
649 })
650 )
651 }
652
653 /**
654 * Terminates the given worker.
655 *
656 * @param worker - A worker within `workerNodes`.
657 */
658 protected abstract destroyWorker (worker: Worker): void | Promise<void>
659
660 /**
661 * Setup hook to execute code before worker nodes are created in the abstract constructor.
662 * Can be overridden.
663 *
664 * @virtual
665 */
666 protected setupHook (): void {
667 // Intentionally empty
668 }
669
670 /**
671 * Should return whether the worker is the main worker or not.
672 */
673 protected abstract isMain (): boolean
674
675 /**
676 * Hook executed before the worker task execution.
677 * Can be overridden.
678 *
679 * @param workerNodeKey - The worker node key.
680 * @param task - The task to execute.
681 */
682 protected beforeTaskExecutionHook (
683 workerNodeKey: number,
684 task: Task<Data>
685 ): void {
686 const workerUsage = this.workerNodes[workerNodeKey].usage
687 ++workerUsage.tasks.executing
688 this.updateWaitTimeWorkerUsage(workerUsage, task)
689 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
690 task.name as string
691 ) as WorkerUsage
692 ++taskWorkerUsage.tasks.executing
693 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
694 }
695
696 /**
697 * Hook executed after the worker task execution.
698 * Can be overridden.
699 *
700 * @param worker - The worker.
701 * @param message - The received message.
702 */
703 protected afterTaskExecutionHook (
704 worker: Worker,
705 message: MessageValue<Response>
706 ): void {
707 const workerNodeKey = this.getWorkerNodeKey(worker)
708 const workerUsage = this.workerNodes[workerNodeKey].usage
709 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
710 this.updateRunTimeWorkerUsage(workerUsage, message)
711 this.updateEluWorkerUsage(workerUsage, message)
712 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
713 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
714 ) as WorkerUsage
715 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
716 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
717 this.updateEluWorkerUsage(taskWorkerUsage, message)
718 }
719
720 private updateTaskStatisticsWorkerUsage (
721 workerUsage: WorkerUsage,
722 message: MessageValue<Response>
723 ): void {
724 const workerTaskStatistics = workerUsage.tasks
725 --workerTaskStatistics.executing
726 if (message.taskError == null) {
727 ++workerTaskStatistics.executed
728 } else {
729 ++workerTaskStatistics.failed
730 }
731 }
732
733 private updateRunTimeWorkerUsage (
734 workerUsage: WorkerUsage,
735 message: MessageValue<Response>
736 ): void {
737 if (
738 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
739 .aggregate
740 ) {
741 const taskRunTime = message.taskPerformance?.runTime ?? 0
742 workerUsage.runTime.aggregate =
743 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
744 workerUsage.runTime.minimum = Math.min(
745 taskRunTime,
746 workerUsage.runTime?.minimum ?? Infinity
747 )
748 workerUsage.runTime.maximum = Math.max(
749 taskRunTime,
750 workerUsage.runTime?.maximum ?? -Infinity
751 )
752 if (
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
754 .average &&
755 workerUsage.tasks.executed !== 0
756 ) {
757 workerUsage.runTime.average =
758 workerUsage.runTime.aggregate / workerUsage.tasks.executed
759 }
760 if (
761 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
762 .median &&
763 message.taskPerformance?.runTime != null
764 ) {
765 workerUsage.runTime.history.push(message.taskPerformance.runTime)
766 workerUsage.runTime.median = median(workerUsage.runTime.history)
767 }
768 }
769 }
770
771 private updateWaitTimeWorkerUsage (
772 workerUsage: WorkerUsage,
773 task: Task<Data>
774 ): void {
775 const timestamp = performance.now()
776 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
777 if (
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
779 .aggregate
780 ) {
781 workerUsage.waitTime.aggregate =
782 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
783 workerUsage.waitTime.minimum = Math.min(
784 taskWaitTime,
785 workerUsage.waitTime?.minimum ?? Infinity
786 )
787 workerUsage.waitTime.maximum = Math.max(
788 taskWaitTime,
789 workerUsage.waitTime?.maximum ?? -Infinity
790 )
791 if (
792 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
793 .waitTime.average &&
794 workerUsage.tasks.executed !== 0
795 ) {
796 workerUsage.waitTime.average =
797 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
798 }
799 if (
800 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
801 .waitTime.median
802 ) {
803 workerUsage.waitTime.history.push(taskWaitTime)
804 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
805 }
806 }
807 }
808
809 private updateEluWorkerUsage (
810 workerUsage: WorkerUsage,
811 message: MessageValue<Response>
812 ): void {
813 if (
814 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
815 .aggregate
816 ) {
817 if (message.taskPerformance?.elu != null) {
818 workerUsage.elu.idle.aggregate =
819 (workerUsage.elu.idle?.aggregate ?? 0) +
820 message.taskPerformance.elu.idle
821 workerUsage.elu.active.aggregate =
822 (workerUsage.elu.active?.aggregate ?? 0) +
823 message.taskPerformance.elu.active
824 if (workerUsage.elu.utilization != null) {
825 workerUsage.elu.utilization =
826 (workerUsage.elu.utilization +
827 message.taskPerformance.elu.utilization) /
828 2
829 } else {
830 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
831 }
832 workerUsage.elu.idle.minimum = Math.min(
833 message.taskPerformance.elu.idle,
834 workerUsage.elu.idle?.minimum ?? Infinity
835 )
836 workerUsage.elu.idle.maximum = Math.max(
837 message.taskPerformance.elu.idle,
838 workerUsage.elu.idle?.maximum ?? -Infinity
839 )
840 workerUsage.elu.active.minimum = Math.min(
841 message.taskPerformance.elu.active,
842 workerUsage.elu.active?.minimum ?? Infinity
843 )
844 workerUsage.elu.active.maximum = Math.max(
845 message.taskPerformance.elu.active,
846 workerUsage.elu.active?.maximum ?? -Infinity
847 )
848 if (
849 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
850 .average &&
851 workerUsage.tasks.executed !== 0
852 ) {
853 workerUsage.elu.idle.average =
854 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
855 workerUsage.elu.active.average =
856 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
857 }
858 if (
859 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
860 .median
861 ) {
862 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
863 workerUsage.elu.active.history.push(
864 message.taskPerformance.elu.active
865 )
866 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
867 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
868 }
869 }
870 }
871 }
872
873 /**
874 * Chooses a worker node for the next task.
875 *
876 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
877 *
878 * @returns The worker node key
879 */
880 private chooseWorkerNode (): number {
881 if (this.shallCreateDynamicWorker()) {
882 const worker = this.createAndSetupDynamicWorker()
883 if (
884 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
885 ) {
886 return this.getWorkerNodeKey(worker)
887 }
888 }
889 return this.workerChoiceStrategyContext.execute()
890 }
891
892 /**
893 * Conditions for dynamic worker creation.
894 *
895 * @returns Whether to create a dynamic worker or not.
896 */
897 private shallCreateDynamicWorker (): boolean {
898 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
899 }
900
901 /**
902 * Sends a message to the given worker.
903 *
904 * @param worker - The worker which should receive the message.
905 * @param message - The message.
906 */
907 protected abstract sendToWorker (
908 worker: Worker,
909 message: MessageValue<Data>
910 ): void
911
912 /**
913 * Creates a new worker.
914 *
915 * @returns Newly created worker.
916 */
917 protected abstract createWorker (): Worker
918
919 /**
920 * Creates a new worker and sets it up completely in the pool worker nodes.
921 *
922 * @returns New, completely set up worker.
923 */
924 protected createAndSetupWorker (): Worker {
925 const worker = this.createWorker()
926
927 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
928 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
929 worker.on('error', error => {
930 const workerNodeKey = this.getWorkerNodeKey(worker)
931 const workerInfo = this.getWorkerInfo(workerNodeKey)
932 workerInfo.ready = false
933 if (this.emitter != null) {
934 this.emitter.emit(PoolEvents.error, error)
935 }
936 if (this.opts.restartWorkerOnError === true && !this.starting) {
937 if (workerInfo.dynamic) {
938 this.createAndSetupDynamicWorker()
939 } else {
940 this.createAndSetupWorker()
941 }
942 }
943 if (this.opts.enableTasksQueue === true) {
944 this.redistributeQueuedTasks(workerNodeKey)
945 }
946 })
947 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
948 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
949 worker.once('exit', () => {
950 this.removeWorkerNode(worker)
951 })
952
953 this.pushWorkerNode(worker)
954
955 this.afterWorkerSetup(worker)
956
957 return worker
958 }
959
960 /**
961 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
962 *
963 * @returns New, completely set up dynamic worker.
964 */
965 protected createAndSetupDynamicWorker (): Worker {
966 const worker = this.createAndSetupWorker()
967 this.registerWorkerMessageListener(worker, message => {
968 const workerNodeKey = this.getWorkerNodeKey(worker)
969 if (
970 isKillBehavior(KillBehaviors.HARD, message.kill) ||
971 (message.kill != null &&
972 ((this.opts.enableTasksQueue === false &&
973 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
974 (this.opts.enableTasksQueue === true &&
975 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
976 this.tasksQueueSize(workerNodeKey) === 0)))
977 ) {
978 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
979 void (this.destroyWorker(worker) as Promise<void>)
980 }
981 })
982 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
983 workerInfo.dynamic = true
984 this.sendToWorker(worker, {
985 checkAlive: true,
986 workerId: workerInfo.id as number
987 })
988 return worker
989 }
990
991 /**
992 * Registers a listener callback on the given worker.
993 *
994 * @param worker - The worker which should register a listener.
995 * @param listener - The message listener callback.
996 */
997 private registerWorkerMessageListener<Message extends Data | Response>(
998 worker: Worker,
999 listener: (message: MessageValue<Message>) => void
1000 ): void {
1001 worker.on('message', listener as MessageHandler<Worker>)
1002 }
1003
1004 /**
1005 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1006 * Can be overridden.
1007 *
1008 * @param worker - The newly created worker.
1009 */
1010 protected afterWorkerSetup (worker: Worker): void {
1011 // Listen to worker messages.
1012 this.registerWorkerMessageListener(worker, this.workerListener())
1013 // Send startup message to worker.
1014 this.sendToWorker(worker, {
1015 ready: false,
1016 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1017 })
1018 // Setup worker task statistics computation.
1019 this.setWorkerStatistics(worker)
1020 }
1021
1022 private redistributeQueuedTasks (workerNodeKey: number): void {
1023 while (this.tasksQueueSize(workerNodeKey) > 0) {
1024 let targetWorkerNodeKey: number = workerNodeKey
1025 let minQueuedTasks = Infinity
1026 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1027 const workerInfo = this.getWorkerInfo(workerNodeId)
1028 if (
1029 workerNodeId !== workerNodeKey &&
1030 workerInfo.ready &&
1031 workerNode.usage.tasks.queued === 0
1032 ) {
1033 targetWorkerNodeKey = workerNodeId
1034 break
1035 }
1036 if (
1037 workerNodeId !== workerNodeKey &&
1038 workerInfo.ready &&
1039 workerNode.usage.tasks.queued < minQueuedTasks
1040 ) {
1041 minQueuedTasks = workerNode.usage.tasks.queued
1042 targetWorkerNodeKey = workerNodeId
1043 }
1044 }
1045 this.enqueueTask(
1046 targetWorkerNodeKey,
1047 this.dequeueTask(workerNodeKey) as Task<Data>
1048 )
1049 }
1050 }
1051
1052 /**
1053 * This function is the listener registered for each worker message.
1054 *
1055 * @returns The listener function to execute when a message is received from a worker.
1056 */
1057 protected workerListener (): (message: MessageValue<Response>) => void {
1058 return message => {
1059 this.checkMessageWorkerId(message)
1060 if (message.ready != null && message.workerId != null) {
1061 // Worker ready message received
1062 this.handleWorkerReadyMessage(message)
1063 } else if (message.id != null) {
1064 // Task execution response received
1065 this.handleTaskExecutionResponse(message)
1066 }
1067 }
1068 }
1069
1070 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1071 const worker = this.getWorkerById(message.workerId)
1072 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1073 message.ready as boolean
1074 if (this.emitter != null && this.ready) {
1075 this.emitter.emit(PoolEvents.ready, this.info)
1076 }
1077 }
1078
1079 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1080 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1081 if (promiseResponse != null) {
1082 if (message.taskError != null) {
1083 if (this.emitter != null) {
1084 this.emitter.emit(PoolEvents.taskError, message.taskError)
1085 }
1086 promiseResponse.reject(message.taskError.message)
1087 } else {
1088 promiseResponse.resolve(message.data as Response)
1089 }
1090 this.afterTaskExecutionHook(promiseResponse.worker, message)
1091 this.promiseResponseMap.delete(message.id as string)
1092 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1093 if (
1094 this.opts.enableTasksQueue === true &&
1095 this.tasksQueueSize(workerNodeKey) > 0
1096 ) {
1097 this.executeTask(
1098 workerNodeKey,
1099 this.dequeueTask(workerNodeKey) as Task<Data>
1100 )
1101 }
1102 this.workerChoiceStrategyContext.update(workerNodeKey)
1103 }
1104 }
1105
1106 private checkAndEmitEvents (): void {
1107 if (this.emitter != null) {
1108 if (this.busy) {
1109 this.emitter.emit(PoolEvents.busy, this.info)
1110 }
1111 if (this.type === PoolTypes.dynamic && this.full) {
1112 this.emitter.emit(PoolEvents.full, this.info)
1113 }
1114 }
1115 }
1116
1117 /**
1118 * Gets the worker information.
1119 *
1120 * @param workerNodeKey - The worker node key.
1121 */
1122 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1123 return this.workerNodes[workerNodeKey].info
1124 }
1125
1126 /**
1127 * Pushes the given worker in the pool worker nodes.
1128 *
1129 * @param worker - The worker.
1130 * @returns The worker nodes length.
1131 */
1132 private pushWorkerNode (worker: Worker): number {
1133 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1134 }
1135
1136 /**
1137 * Removes the given worker from the pool worker nodes.
1138 *
1139 * @param worker - The worker.
1140 */
1141 private removeWorkerNode (worker: Worker): void {
1142 const workerNodeKey = this.getWorkerNodeKey(worker)
1143 if (workerNodeKey !== -1) {
1144 this.workerNodes.splice(workerNodeKey, 1)
1145 this.workerChoiceStrategyContext.remove(workerNodeKey)
1146 }
1147 }
1148
1149 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1150 this.beforeTaskExecutionHook(workerNodeKey, task)
1151 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1152 }
1153
1154 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1155 return this.workerNodes[workerNodeKey].enqueueTask(task)
1156 }
1157
1158 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1159 return this.workerNodes[workerNodeKey].dequeueTask()
1160 }
1161
1162 private tasksQueueSize (workerNodeKey: number): number {
1163 return this.workerNodes[workerNodeKey].tasksQueueSize()
1164 }
1165
1166 private flushTasksQueue (workerNodeKey: number): void {
1167 while (this.tasksQueueSize(workerNodeKey) > 0) {
1168 this.executeTask(
1169 workerNodeKey,
1170 this.dequeueTask(workerNodeKey) as Task<Data>
1171 )
1172 }
1173 this.workerNodes[workerNodeKey].clearTasksQueue()
1174 }
1175
1176 private flushTasksQueues (): void {
1177 for (const [workerNodeKey] of this.workerNodes.entries()) {
1178 this.flushTasksQueue(workerNodeKey)
1179 }
1180 }
1181
1182 private setWorkerStatistics (worker: Worker): void {
1183 this.sendToWorker(worker, {
1184 statistics: {
1185 runTime:
1186 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1187 .runTime.aggregate,
1188 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1189 .elu.aggregate
1190 },
1191 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1192 })
1193 }
1194 }