feat: make the remaining worker choice strategies worker readiness aware
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import { existsSync } from 'node:fs'
4 import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8 } from '../utility-types'
9 import {
10 DEFAULT_TASK_NAME,
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
13 isKillBehavior,
14 isPlainObject,
15 median,
16 round
17 } from '../utils'
18 import { KillBehaviors } from '../worker/worker-options'
19 import {
20 type IPool,
21 PoolEmitter,
22 PoolEvents,
23 type PoolInfo,
24 type PoolOptions,
25 type PoolType,
26 PoolTypes,
27 type TasksQueueOptions
28 } from './pool'
29 import type {
30 IWorker,
31 IWorkerNode,
32 MessageHandler,
33 WorkerInfo,
34 WorkerType,
35 WorkerUsage
36 } from './worker'
37 import {
38 Measurements,
39 WorkerChoiceStrategies,
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
42 } from './selection-strategies/selection-strategies-types'
43 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
44 import { version } from './version'
45 import { WorkerNode } from './worker-node'
46
47 /**
48 * Base class that implements some shared logic for all poolifier pools.
49 *
50 * @typeParam Worker - Type of worker which manages this pool.
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
53 */
54 export abstract class AbstractPool<
55 Worker extends IWorker,
56 Data = unknown,
57 Response = unknown
58 > implements IPool<Worker, Data, Response> {
59 /** @inheritDoc */
60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
61
62 /** @inheritDoc */
63 public readonly emitter?: PoolEmitter
64
65 /**
66 * The execution response promise map.
67 *
68 * - `key`: The message id of each submitted task.
69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
70 *
71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
72 */
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
77
78 /**
79 * Worker choice strategy context referencing a worker choice algorithm implementation.
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
82 Worker,
83 Data,
84 Response
85 >
86
87 /**
88 * The start timestamp of the pool.
89 */
90 private readonly startTimestamp
91
92 /**
93 * Constructs a new poolifier pool.
94 *
95 * @param numberOfWorkers - Number of workers that this pool should manage.
96 * @param filePath - Path to the worker file.
97 * @param opts - Options for the pool.
98 */
99 public constructor (
100 protected readonly numberOfWorkers: number,
101 protected readonly filePath: string,
102 protected readonly opts: PoolOptions<Worker>
103 ) {
104 if (!this.isMain()) {
105 throw new Error('Cannot start a pool from a worker!')
106 }
107 this.checkNumberOfWorkers(this.numberOfWorkers)
108 this.checkFilePath(this.filePath)
109 this.checkPoolOptions(this.opts)
110
111 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
112 this.executeTask = this.executeTask.bind(this)
113 this.enqueueTask = this.enqueueTask.bind(this)
114 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
115
116 if (this.opts.enableEvents === true) {
117 this.emitter = new PoolEmitter()
118 }
119 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
120 Worker,
121 Data,
122 Response
123 >(
124 this,
125 this.opts.workerChoiceStrategy,
126 this.opts.workerChoiceStrategyOptions
127 )
128
129 this.setupHook()
130
131 while (this.workerNodes.length < this.numberOfWorkers) {
132 this.createAndSetupWorker()
133 }
134
135 this.startTimestamp = performance.now()
136 }
137
138 private checkFilePath (filePath: string): void {
139 if (
140 filePath == null ||
141 typeof filePath !== 'string' ||
142 (typeof filePath === 'string' && filePath.trim().length === 0)
143 ) {
144 throw new Error('Please specify a file with a worker implementation')
145 }
146 if (!existsSync(filePath)) {
147 throw new Error(`Cannot find the worker file '${filePath}'`)
148 }
149 }
150
151 private checkNumberOfWorkers (numberOfWorkers: number): void {
152 if (numberOfWorkers == null) {
153 throw new Error(
154 'Cannot instantiate a pool without specifying the number of workers'
155 )
156 } else if (!Number.isSafeInteger(numberOfWorkers)) {
157 throw new TypeError(
158 'Cannot instantiate a pool with a non safe integer number of workers'
159 )
160 } else if (numberOfWorkers < 0) {
161 throw new RangeError(
162 'Cannot instantiate a pool with a negative number of workers'
163 )
164 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
165 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
166 }
167 }
168
169 protected checkDynamicPoolSize (min: number, max: number): void {
170 if (this.type === PoolTypes.dynamic) {
171 if (min > max) {
172 throw new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 } else if (min === 0 && max === 0) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
178 )
179 } else if (min === max) {
180 throw new RangeError(
181 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
182 )
183 }
184 }
185 }
186
187 private checkPoolOptions (opts: PoolOptions<Worker>): void {
188 if (isPlainObject(opts)) {
189 this.opts.workerChoiceStrategy =
190 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
191 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
192 this.opts.workerChoiceStrategyOptions =
193 opts.workerChoiceStrategyOptions ??
194 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
195 this.checkValidWorkerChoiceStrategyOptions(
196 this.opts.workerChoiceStrategyOptions
197 )
198 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
199 this.opts.enableEvents = opts.enableEvents ?? true
200 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
201 if (this.opts.enableTasksQueue) {
202 this.checkValidTasksQueueOptions(
203 opts.tasksQueueOptions as TasksQueueOptions
204 )
205 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 }
209 } else {
210 throw new TypeError('Invalid pool options: must be a plain object')
211 }
212 }
213
214 private checkValidWorkerChoiceStrategy (
215 workerChoiceStrategy: WorkerChoiceStrategy
216 ): void {
217 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
218 throw new Error(
219 `Invalid worker choice strategy '${workerChoiceStrategy}'`
220 )
221 }
222 }
223
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
226 ): void {
227 if (!isPlainObject(workerChoiceStrategyOptions)) {
228 throw new TypeError(
229 'Invalid worker choice strategy options: must be a plain object'
230 )
231 }
232 if (
233 workerChoiceStrategyOptions.weights != null &&
234 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
235 ) {
236 throw new Error(
237 'Invalid worker choice strategy options: must have a weight for each worker node'
238 )
239 }
240 if (
241 workerChoiceStrategyOptions.measurement != null &&
242 !Object.values(Measurements).includes(
243 workerChoiceStrategyOptions.measurement
244 )
245 ) {
246 throw new Error(
247 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
248 )
249 }
250 }
251
252 private checkValidTasksQueueOptions (
253 tasksQueueOptions: TasksQueueOptions
254 ): void {
255 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
256 throw new TypeError('Invalid tasks queue options: must be a plain object')
257 }
258 if (
259 tasksQueueOptions?.concurrency != null &&
260 !Number.isSafeInteger(tasksQueueOptions.concurrency)
261 ) {
262 throw new TypeError(
263 'Invalid worker tasks concurrency: must be an integer'
264 )
265 }
266 if (
267 tasksQueueOptions?.concurrency != null &&
268 tasksQueueOptions.concurrency <= 0
269 ) {
270 throw new Error(
271 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
272 )
273 }
274 }
275
276 /** @inheritDoc */
277 public get info (): PoolInfo {
278 return {
279 version,
280 type: this.type,
281 worker: this.worker,
282 ready: this.ready,
283 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
284 minSize: this.minSize,
285 maxSize: this.maxSize,
286 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
287 .runTime.aggregate &&
288 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
289 .waitTime.aggregate && { utilization: round(this.utilization) }),
290 workerNodes: this.workerNodes.length,
291 idleWorkerNodes: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 workerNode.usage.tasks.executing === 0
294 ? accumulator + 1
295 : accumulator,
296 0
297 ),
298 busyWorkerNodes: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
300 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
301 0
302 ),
303 executedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.executed,
306 0
307 ),
308 executingTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 accumulator + workerNode.usage.tasks.executing,
311 0
312 ),
313 queuedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + workerNode.usage.tasks.queued,
316 0
317 ),
318 maxQueuedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
321 0
322 ),
323 failedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.failed,
326 0
327 ),
328 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
329 .runTime.aggregate && {
330 runTime: {
331 minimum: round(
332 Math.min(
333 ...this.workerNodes.map(
334 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
335 )
336 )
337 ),
338 maximum: round(
339 Math.max(
340 ...this.workerNodes.map(
341 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
342 )
343 )
344 ),
345 average: round(
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
349 0
350 ) /
351 this.workerNodes.reduce(
352 (accumulator, workerNode) =>
353 accumulator + (workerNode.usage.tasks?.executed ?? 0),
354 0
355 )
356 ),
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.median && {
359 median: round(
360 median(
361 this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.median ?? 0
363 )
364 )
365 )
366 })
367 }
368 }),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .waitTime.aggregate && {
371 waitTime: {
372 minimum: round(
373 Math.min(
374 ...this.workerNodes.map(
375 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
376 )
377 )
378 ),
379 maximum: round(
380 Math.max(
381 ...this.workerNodes.map(
382 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
383 )
384 )
385 ),
386 average: round(
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
390 0
391 ) /
392 this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.executed ?? 0),
395 0
396 )
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.median && {
400 median: round(
401 median(
402 this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.median ?? 0
404 )
405 )
406 )
407 })
408 }
409 })
410 }
411 }
412
413 private get starting (): boolean {
414 return this.workerNodes.length < this.minSize
415 }
416
417 private get ready (): boolean {
418 return (
419 this.workerNodes.length >= this.minSize &&
420 this.workerNodes.every(
421 (workerNode, workerNodeKey) =>
422 workerNodeKey < this.minSize && workerNode.info.ready
423 )
424 )
425 }
426
427 /**
428 * Gets the approximate pool utilization.
429 *
430 * @returns The pool utilization.
431 */
432 private get utilization (): number {
433 const poolTimeCapacity =
434 (performance.now() - this.startTimestamp) * this.maxSize
435 const totalTasksRunTime = this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
438 0
439 )
440 const totalTasksWaitTime = this.workerNodes.reduce(
441 (accumulator, workerNode) =>
442 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
443 0
444 )
445 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
446 }
447
448 /**
449 * Pool type.
450 *
451 * If it is `'dynamic'`, it provides the `max` property.
452 */
453 protected abstract get type (): PoolType
454
455 /**
456 * Gets the worker type.
457 */
458 protected abstract get worker (): WorkerType
459
460 /**
461 * Pool minimum size.
462 */
463 protected abstract get minSize (): number
464
465 /**
466 * Pool maximum size.
467 */
468 protected abstract get maxSize (): number
469
470 /**
471 * Get the worker given its id.
472 *
473 * @param workerId - The worker id.
474 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
475 */
476 private getWorkerById (workerId: number): Worker | undefined {
477 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
478 ?.worker
479 }
480
481 /**
482 * Checks if the worker id sent in the received message from a worker is valid.
483 *
484 * @param message - The received message.
485 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
486 */
487 private checkMessageWorkerId (message: MessageValue<Response>): void {
488 if (
489 message.workerId != null &&
490 this.getWorkerById(message.workerId) == null
491 ) {
492 throw new Error(
493 `Worker message received from unknown worker '${message.workerId}'`
494 )
495 }
496 }
497
498 /**
499 * Gets the given worker its worker node key.
500 *
501 * @param worker - The worker.
502 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
503 */
504 private getWorkerNodeKey (worker: Worker): number {
505 return this.workerNodes.findIndex(
506 workerNode => workerNode.worker === worker
507 )
508 }
509
510 /** @inheritDoc */
511 public setWorkerChoiceStrategy (
512 workerChoiceStrategy: WorkerChoiceStrategy,
513 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
514 ): void {
515 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
516 this.opts.workerChoiceStrategy = workerChoiceStrategy
517 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
518 this.opts.workerChoiceStrategy
519 )
520 if (workerChoiceStrategyOptions != null) {
521 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
522 }
523 for (const workerNode of this.workerNodes) {
524 workerNode.resetUsage()
525 this.setWorkerStatistics(workerNode.worker)
526 }
527 }
528
529 /** @inheritDoc */
530 public setWorkerChoiceStrategyOptions (
531 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
532 ): void {
533 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
534 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
535 this.workerChoiceStrategyContext.setOptions(
536 this.opts.workerChoiceStrategyOptions
537 )
538 }
539
540 /** @inheritDoc */
541 public enableTasksQueue (
542 enable: boolean,
543 tasksQueueOptions?: TasksQueueOptions
544 ): void {
545 if (this.opts.enableTasksQueue === true && !enable) {
546 this.flushTasksQueues()
547 }
548 this.opts.enableTasksQueue = enable
549 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
550 }
551
552 /** @inheritDoc */
553 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
554 if (this.opts.enableTasksQueue === true) {
555 this.checkValidTasksQueueOptions(tasksQueueOptions)
556 this.opts.tasksQueueOptions =
557 this.buildTasksQueueOptions(tasksQueueOptions)
558 } else if (this.opts.tasksQueueOptions != null) {
559 delete this.opts.tasksQueueOptions
560 }
561 }
562
563 private buildTasksQueueOptions (
564 tasksQueueOptions: TasksQueueOptions
565 ): TasksQueueOptions {
566 return {
567 concurrency: tasksQueueOptions?.concurrency ?? 1
568 }
569 }
570
571 /**
572 * Whether the pool is full or not.
573 *
574 * The pool filling boolean status.
575 */
576 protected get full (): boolean {
577 return this.workerNodes.length >= this.maxSize
578 }
579
580 /**
581 * Whether the pool is busy or not.
582 *
583 * The pool busyness boolean status.
584 */
585 protected abstract get busy (): boolean
586
587 /**
588 * Whether worker nodes are executing at least one task.
589 *
590 * @returns Worker nodes busyness boolean status.
591 */
592 protected internalBusy (): boolean {
593 return (
594 this.workerNodes.findIndex(workerNode => {
595 return workerNode.usage.tasks.executing === 0
596 }) === -1
597 )
598 }
599
600 /** @inheritDoc */
601 public async execute (data?: Data, name?: string): Promise<Response> {
602 const timestamp = performance.now()
603 const workerNodeKey = this.chooseWorkerNode()
604 const submittedTask: Task<Data> = {
605 name: name ?? DEFAULT_TASK_NAME,
606 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
607 data: data ?? ({} as Data),
608 timestamp,
609 workerId: this.getWorkerInfo(workerNodeKey).id as number,
610 id: randomUUID()
611 }
612 const res = new Promise<Response>((resolve, reject) => {
613 this.promiseResponseMap.set(submittedTask.id as string, {
614 resolve,
615 reject,
616 worker: this.workerNodes[workerNodeKey].worker
617 })
618 })
619 if (
620 this.opts.enableTasksQueue === true &&
621 (this.busy ||
622 this.workerNodes[workerNodeKey].usage.tasks.executing >=
623 ((this.opts.tasksQueueOptions as TasksQueueOptions)
624 .concurrency as number))
625 ) {
626 this.enqueueTask(workerNodeKey, submittedTask)
627 } else {
628 this.executeTask(workerNodeKey, submittedTask)
629 }
630 this.checkAndEmitEvents()
631 // eslint-disable-next-line @typescript-eslint/return-await
632 return res
633 }
634
635 /** @inheritDoc */
636 public async destroy (): Promise<void> {
637 await Promise.all(
638 this.workerNodes.map(async (workerNode, workerNodeKey) => {
639 this.flushTasksQueue(workerNodeKey)
640 // FIXME: wait for tasks to be finished
641 const workerExitPromise = new Promise<void>(resolve => {
642 workerNode.worker.on('exit', () => {
643 resolve()
644 })
645 })
646 await this.destroyWorker(workerNode.worker)
647 await workerExitPromise
648 })
649 )
650 }
651
652 /**
653 * Terminates the given worker.
654 *
655 * @param worker - A worker within `workerNodes`.
656 */
657 protected abstract destroyWorker (worker: Worker): void | Promise<void>
658
659 /**
660 * Setup hook to execute code before worker nodes are created in the abstract constructor.
661 * Can be overridden.
662 *
663 * @virtual
664 */
665 protected setupHook (): void {
666 // Intentionally empty
667 }
668
669 /**
670 * Should return whether the worker is the main worker or not.
671 */
672 protected abstract isMain (): boolean
673
674 /**
675 * Hook executed before the worker task execution.
676 * Can be overridden.
677 *
678 * @param workerNodeKey - The worker node key.
679 * @param task - The task to execute.
680 */
681 protected beforeTaskExecutionHook (
682 workerNodeKey: number,
683 task: Task<Data>
684 ): void {
685 const workerUsage = this.workerNodes[workerNodeKey].usage
686 ++workerUsage.tasks.executing
687 this.updateWaitTimeWorkerUsage(workerUsage, task)
688 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
689 task.name as string
690 ) as WorkerUsage
691 ++taskWorkerUsage.tasks.executing
692 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
693 }
694
695 /**
696 * Hook executed after the worker task execution.
697 * Can be overridden.
698 *
699 * @param worker - The worker.
700 * @param message - The received message.
701 */
702 protected afterTaskExecutionHook (
703 worker: Worker,
704 message: MessageValue<Response>
705 ): void {
706 const workerNodeKey = this.getWorkerNodeKey(worker)
707 const workerUsage = this.workerNodes[workerNodeKey].usage
708 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
709 this.updateRunTimeWorkerUsage(workerUsage, message)
710 this.updateEluWorkerUsage(workerUsage, message)
711 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
712 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
713 ) as WorkerUsage
714 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
715 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
716 this.updateEluWorkerUsage(taskWorkerUsage, message)
717 }
718
719 private updateTaskStatisticsWorkerUsage (
720 workerUsage: WorkerUsage,
721 message: MessageValue<Response>
722 ): void {
723 const workerTaskStatistics = workerUsage.tasks
724 --workerTaskStatistics.executing
725 if (message.taskError == null) {
726 ++workerTaskStatistics.executed
727 } else {
728 ++workerTaskStatistics.failed
729 }
730 }
731
732 private updateRunTimeWorkerUsage (
733 workerUsage: WorkerUsage,
734 message: MessageValue<Response>
735 ): void {
736 if (
737 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
738 .aggregate
739 ) {
740 const taskRunTime = message.taskPerformance?.runTime ?? 0
741 workerUsage.runTime.aggregate =
742 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
743 workerUsage.runTime.minimum = Math.min(
744 taskRunTime,
745 workerUsage.runTime?.minimum ?? Infinity
746 )
747 workerUsage.runTime.maximum = Math.max(
748 taskRunTime,
749 workerUsage.runTime?.maximum ?? -Infinity
750 )
751 if (
752 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
753 .average &&
754 workerUsage.tasks.executed !== 0
755 ) {
756 workerUsage.runTime.average =
757 workerUsage.runTime.aggregate / workerUsage.tasks.executed
758 }
759 if (
760 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
761 .median &&
762 message.taskPerformance?.runTime != null
763 ) {
764 workerUsage.runTime.history.push(message.taskPerformance.runTime)
765 workerUsage.runTime.median = median(workerUsage.runTime.history)
766 }
767 }
768 }
769
770 private updateWaitTimeWorkerUsage (
771 workerUsage: WorkerUsage,
772 task: Task<Data>
773 ): void {
774 const timestamp = performance.now()
775 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
776 if (
777 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
778 .aggregate
779 ) {
780 workerUsage.waitTime.aggregate =
781 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
782 workerUsage.waitTime.minimum = Math.min(
783 taskWaitTime,
784 workerUsage.waitTime?.minimum ?? Infinity
785 )
786 workerUsage.waitTime.maximum = Math.max(
787 taskWaitTime,
788 workerUsage.waitTime?.maximum ?? -Infinity
789 )
790 if (
791 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
792 .waitTime.average &&
793 workerUsage.tasks.executed !== 0
794 ) {
795 workerUsage.waitTime.average =
796 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
797 }
798 if (
799 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
800 .waitTime.median
801 ) {
802 workerUsage.waitTime.history.push(taskWaitTime)
803 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
804 }
805 }
806 }
807
808 private updateEluWorkerUsage (
809 workerUsage: WorkerUsage,
810 message: MessageValue<Response>
811 ): void {
812 if (
813 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
814 .aggregate
815 ) {
816 if (message.taskPerformance?.elu != null) {
817 workerUsage.elu.idle.aggregate =
818 (workerUsage.elu.idle?.aggregate ?? 0) +
819 message.taskPerformance.elu.idle
820 workerUsage.elu.active.aggregate =
821 (workerUsage.elu.active?.aggregate ?? 0) +
822 message.taskPerformance.elu.active
823 if (workerUsage.elu.utilization != null) {
824 workerUsage.elu.utilization =
825 (workerUsage.elu.utilization +
826 message.taskPerformance.elu.utilization) /
827 2
828 } else {
829 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
830 }
831 workerUsage.elu.idle.minimum = Math.min(
832 message.taskPerformance.elu.idle,
833 workerUsage.elu.idle?.minimum ?? Infinity
834 )
835 workerUsage.elu.idle.maximum = Math.max(
836 message.taskPerformance.elu.idle,
837 workerUsage.elu.idle?.maximum ?? -Infinity
838 )
839 workerUsage.elu.active.minimum = Math.min(
840 message.taskPerformance.elu.active,
841 workerUsage.elu.active?.minimum ?? Infinity
842 )
843 workerUsage.elu.active.maximum = Math.max(
844 message.taskPerformance.elu.active,
845 workerUsage.elu.active?.maximum ?? -Infinity
846 )
847 if (
848 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
849 .average &&
850 workerUsage.tasks.executed !== 0
851 ) {
852 workerUsage.elu.idle.average =
853 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
854 workerUsage.elu.active.average =
855 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
856 }
857 if (
858 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
859 .median
860 ) {
861 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
862 workerUsage.elu.active.history.push(
863 message.taskPerformance.elu.active
864 )
865 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
866 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
867 }
868 }
869 }
870 }
871
872 /**
873 * Chooses a worker node for the next task.
874 *
875 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
876 *
877 * @returns The worker node key
878 */
879 private chooseWorkerNode (): number {
880 if (this.shallCreateDynamicWorker()) {
881 const worker = this.createAndSetupDynamicWorker()
882 if (
883 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
884 ) {
885 return this.getWorkerNodeKey(worker)
886 }
887 }
888 return this.workerChoiceStrategyContext.execute()
889 }
890
891 /**
892 * Conditions for dynamic worker creation.
893 *
894 * @returns Whether to create a dynamic worker or not.
895 */
896 private shallCreateDynamicWorker (): boolean {
897 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
898 }
899
900 /**
901 * Sends a message to the given worker.
902 *
903 * @param worker - The worker which should receive the message.
904 * @param message - The message.
905 */
906 protected abstract sendToWorker (
907 worker: Worker,
908 message: MessageValue<Data>
909 ): void
910
911 /**
912 * Creates a new worker.
913 *
914 * @returns Newly created worker.
915 */
916 protected abstract createWorker (): Worker
917
918 /**
919 * Creates a new worker and sets it up completely in the pool worker nodes.
920 *
921 * @returns New, completely set up worker.
922 */
923 protected createAndSetupWorker (): Worker {
924 const worker = this.createWorker()
925
926 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
927 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
928 worker.on('error', error => {
929 const workerNodeKey = this.getWorkerNodeKey(worker)
930 const workerInfo = this.getWorkerInfo(workerNodeKey)
931 workerInfo.ready = false
932 if (this.emitter != null) {
933 this.emitter.emit(PoolEvents.error, error)
934 }
935 if (this.opts.restartWorkerOnError === true && !this.starting) {
936 if (workerInfo.dynamic) {
937 this.createAndSetupDynamicWorker()
938 } else {
939 this.createAndSetupWorker()
940 }
941 }
942 if (this.opts.enableTasksQueue === true) {
943 this.redistributeQueuedTasks(workerNodeKey)
944 }
945 })
946 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
947 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
948 worker.once('exit', () => {
949 this.removeWorkerNode(worker)
950 })
951
952 this.pushWorkerNode(worker)
953
954 this.afterWorkerSetup(worker)
955
956 return worker
957 }
958
959 /**
960 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
961 *
962 * @returns New, completely set up dynamic worker.
963 */
964 protected createAndSetupDynamicWorker (): Worker {
965 const worker = this.createAndSetupWorker()
966 this.registerWorkerMessageListener(worker, message => {
967 const workerNodeKey = this.getWorkerNodeKey(worker)
968 if (
969 isKillBehavior(KillBehaviors.HARD, message.kill) ||
970 (message.kill != null &&
971 ((this.opts.enableTasksQueue === false &&
972 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
973 (this.opts.enableTasksQueue === true &&
974 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
975 this.tasksQueueSize(workerNodeKey) === 0)))
976 ) {
977 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
978 void (this.destroyWorker(worker) as Promise<void>)
979 }
980 })
981 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
982 workerInfo.ready = true
983 workerInfo.dynamic = true
984 this.sendToWorker(worker, {
985 checkAlive: true,
986 workerId: workerInfo.id as number
987 })
988 return worker
989 }
990
991 /**
992 * Registers a listener callback on the given worker.
993 *
994 * @param worker - The worker which should register a listener.
995 * @param listener - The message listener callback.
996 */
997 private registerWorkerMessageListener<Message extends Data | Response>(
998 worker: Worker,
999 listener: (message: MessageValue<Message>) => void
1000 ): void {
1001 worker.on('message', listener as MessageHandler<Worker>)
1002 }
1003
1004 /**
1005 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1006 * Can be overridden.
1007 *
1008 * @param worker - The newly created worker.
1009 */
1010 protected afterWorkerSetup (worker: Worker): void {
1011 // Listen to worker messages.
1012 this.registerWorkerMessageListener(worker, this.workerListener())
1013 // Send startup message to worker.
1014 this.sendWorkerStartupMessage(worker)
1015 // Setup worker task statistics computation.
1016 this.setWorkerStatistics(worker)
1017 }
1018
1019 private sendWorkerStartupMessage (worker: Worker): void {
1020 this.sendToWorker(worker, {
1021 ready: false,
1022 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1023 })
1024 }
1025
1026 private redistributeQueuedTasks (workerNodeKey: number): void {
1027 while (this.tasksQueueSize(workerNodeKey) > 0) {
1028 let targetWorkerNodeKey: number = workerNodeKey
1029 let minQueuedTasks = Infinity
1030 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1031 const workerInfo = this.getWorkerInfo(workerNodeId)
1032 if (
1033 workerNodeId !== workerNodeKey &&
1034 workerInfo.ready &&
1035 workerNode.usage.tasks.queued === 0
1036 ) {
1037 targetWorkerNodeKey = workerNodeId
1038 break
1039 }
1040 if (
1041 workerNodeId !== workerNodeKey &&
1042 workerInfo.ready &&
1043 workerNode.usage.tasks.queued < minQueuedTasks
1044 ) {
1045 minQueuedTasks = workerNode.usage.tasks.queued
1046 targetWorkerNodeKey = workerNodeId
1047 }
1048 }
1049 this.enqueueTask(
1050 targetWorkerNodeKey,
1051 this.dequeueTask(workerNodeKey) as Task<Data>
1052 )
1053 }
1054 }
1055
1056 /**
1057 * This function is the listener registered for each worker message.
1058 *
1059 * @returns The listener function to execute when a message is received from a worker.
1060 */
1061 protected workerListener (): (message: MessageValue<Response>) => void {
1062 return message => {
1063 this.checkMessageWorkerId(message)
1064 if (message.ready != null) {
1065 // Worker ready message received
1066 this.handleWorkerReadyMessage(message)
1067 } else if (message.id != null) {
1068 // Task execution response received
1069 this.handleTaskExecutionResponse(message)
1070 }
1071 }
1072 }
1073
1074 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1075 const worker = this.getWorkerById(message.workerId)
1076 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1077 message.ready as boolean
1078 if (this.emitter != null && this.ready) {
1079 this.emitter.emit(PoolEvents.ready, this.info)
1080 }
1081 }
1082
1083 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1084 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1085 if (promiseResponse != null) {
1086 if (message.taskError != null) {
1087 if (this.emitter != null) {
1088 this.emitter.emit(PoolEvents.taskError, message.taskError)
1089 }
1090 promiseResponse.reject(message.taskError.message)
1091 } else {
1092 promiseResponse.resolve(message.data as Response)
1093 }
1094 this.afterTaskExecutionHook(promiseResponse.worker, message)
1095 this.promiseResponseMap.delete(message.id as string)
1096 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1097 if (
1098 this.opts.enableTasksQueue === true &&
1099 this.tasksQueueSize(workerNodeKey) > 0
1100 ) {
1101 this.executeTask(
1102 workerNodeKey,
1103 this.dequeueTask(workerNodeKey) as Task<Data>
1104 )
1105 }
1106 this.workerChoiceStrategyContext.update(workerNodeKey)
1107 }
1108 }
1109
1110 private checkAndEmitEvents (): void {
1111 if (this.emitter != null) {
1112 if (this.busy) {
1113 this.emitter.emit(PoolEvents.busy, this.info)
1114 }
1115 if (this.type === PoolTypes.dynamic && this.full) {
1116 this.emitter.emit(PoolEvents.full, this.info)
1117 }
1118 }
1119 }
1120
1121 /**
1122 * Gets the worker information.
1123 *
1124 * @param workerNodeKey - The worker node key.
1125 */
1126 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1127 return this.workerNodes[workerNodeKey].info
1128 }
1129
1130 /**
1131 * Pushes the given worker in the pool worker nodes.
1132 *
1133 * @param worker - The worker.
1134 * @returns The worker nodes length.
1135 */
1136 private pushWorkerNode (worker: Worker): number {
1137 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
1138 // Flag the worker as ready at pool startup.
1139 if (this.starting) {
1140 workerNode.info.ready = true
1141 }
1142 return this.workerNodes.push(workerNode)
1143 }
1144
1145 /**
1146 * Removes the given worker from the pool worker nodes.
1147 *
1148 * @param worker - The worker.
1149 */
1150 private removeWorkerNode (worker: Worker): void {
1151 const workerNodeKey = this.getWorkerNodeKey(worker)
1152 if (workerNodeKey !== -1) {
1153 this.workerNodes.splice(workerNodeKey, 1)
1154 this.workerChoiceStrategyContext.remove(workerNodeKey)
1155 }
1156 }
1157
1158 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1159 this.beforeTaskExecutionHook(workerNodeKey, task)
1160 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1161 }
1162
1163 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1164 return this.workerNodes[workerNodeKey].enqueueTask(task)
1165 }
1166
1167 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1168 return this.workerNodes[workerNodeKey].dequeueTask()
1169 }
1170
1171 private tasksQueueSize (workerNodeKey: number): number {
1172 return this.workerNodes[workerNodeKey].tasksQueueSize()
1173 }
1174
1175 private flushTasksQueue (workerNodeKey: number): void {
1176 while (this.tasksQueueSize(workerNodeKey) > 0) {
1177 this.executeTask(
1178 workerNodeKey,
1179 this.dequeueTask(workerNodeKey) as Task<Data>
1180 )
1181 }
1182 this.workerNodes[workerNodeKey].clearTasksQueue()
1183 }
1184
1185 private flushTasksQueues (): void {
1186 for (const [workerNodeKey] of this.workerNodes.entries()) {
1187 this.flushTasksQueue(workerNodeKey)
1188 }
1189 }
1190
1191 private setWorkerStatistics (worker: Worker): void {
1192 this.sendToWorker(worker, {
1193 statistics: {
1194 runTime:
1195 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1196 .runTime.aggregate,
1197 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1198 .elu.aggregate
1199 },
1200 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1201 })
1202 }
1203 }