feat: worker node readiness aware worker choice strategies
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type {
4 MessageValue,
5 PromiseResponseWrapper,
6 Task
7 } from '../utility-types'
8 import {
9 DEFAULT_TASK_NAME,
10 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
11 EMPTY_FUNCTION,
12 isKillBehavior,
13 isPlainObject,
14 median,
15 round
16 } from '../utils'
17 import { KillBehaviors } from '../worker/worker-options'
18 import {
19 type IPool,
20 PoolEmitter,
21 PoolEvents,
22 type PoolInfo,
23 type PoolOptions,
24 type PoolType,
25 PoolTypes,
26 type TasksQueueOptions
27 } from './pool'
28 import type {
29 IWorker,
30 IWorkerNode,
31 MessageHandler,
32 WorkerInfo,
33 WorkerType,
34 WorkerUsage
35 } from './worker'
36 import {
37 Measurements,
38 WorkerChoiceStrategies,
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
41 } from './selection-strategies/selection-strategies-types'
42 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
43 import { version } from './version'
44 import { WorkerNode } from './worker-node'
45
46 /**
47 * Base class that implements some shared logic for all poolifier pools.
48 *
49 * @typeParam Worker - Type of worker which manages this pool.
50 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
51 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
52 */
53 export abstract class AbstractPool<
54 Worker extends IWorker,
55 Data = unknown,
56 Response = unknown
57 > implements IPool<Worker, Data, Response> {
58 /** @inheritDoc */
59 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
60
61 /** @inheritDoc */
62 public readonly emitter?: PoolEmitter
63
64 /**
65 * The execution response promise map.
66 *
67 * - `key`: The message id of each submitted task.
68 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
69 *
70 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
71 */
72 protected promiseResponseMap: Map<
73 string,
74 PromiseResponseWrapper<Worker, Response>
75 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
76
77 /**
78 * Worker choice strategy context referencing a worker choice algorithm implementation.
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
81 Worker,
82 Data,
83 Response
84 >
85
86 /**
87 * The start timestamp of the pool.
88 */
89 private readonly startTimestamp
90
91 /**
92 * Constructs a new poolifier pool.
93 *
94 * @param numberOfWorkers - Number of workers that this pool should manage.
95 * @param filePath - Path to the worker file.
96 * @param opts - Options for the pool.
97 */
98 public constructor (
99 protected readonly numberOfWorkers: number,
100 protected readonly filePath: string,
101 protected readonly opts: PoolOptions<Worker>
102 ) {
103 if (!this.isMain()) {
104 throw new Error('Cannot start a pool from a worker!')
105 }
106 this.checkNumberOfWorkers(this.numberOfWorkers)
107 this.checkFilePath(this.filePath)
108 this.checkPoolOptions(this.opts)
109
110 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
111 this.executeTask = this.executeTask.bind(this)
112 this.enqueueTask = this.enqueueTask.bind(this)
113 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
114
115 if (this.opts.enableEvents === true) {
116 this.emitter = new PoolEmitter()
117 }
118 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
119 Worker,
120 Data,
121 Response
122 >(
123 this,
124 this.opts.workerChoiceStrategy,
125 this.opts.workerChoiceStrategyOptions
126 )
127
128 this.setupHook()
129
130 while (this.workerNodes.length < this.numberOfWorkers) {
131 this.createAndSetupWorker()
132 }
133
134 this.startTimestamp = performance.now()
135 }
136
137 private checkFilePath (filePath: string): void {
138 if (
139 filePath == null ||
140 (typeof filePath === 'string' && filePath.trim().length === 0)
141 ) {
142 throw new Error('Please specify a file with a worker implementation')
143 }
144 }
145
146 private checkNumberOfWorkers (numberOfWorkers: number): void {
147 if (numberOfWorkers == null) {
148 throw new Error(
149 'Cannot instantiate a pool without specifying the number of workers'
150 )
151 } else if (!Number.isSafeInteger(numberOfWorkers)) {
152 throw new TypeError(
153 'Cannot instantiate a pool with a non safe integer number of workers'
154 )
155 } else if (numberOfWorkers < 0) {
156 throw new RangeError(
157 'Cannot instantiate a pool with a negative number of workers'
158 )
159 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
160 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
161 }
162 }
163
164 protected checkDynamicPoolSize (min: number, max: number): void {
165 if (this.type === PoolTypes.dynamic) {
166 if (min > max) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 )
170 } else if (min === 0 && max === 0) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
173 )
174 } else if (min === max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 )
178 }
179 }
180 }
181
182 private checkPoolOptions (opts: PoolOptions<Worker>): void {
183 if (isPlainObject(opts)) {
184 this.opts.workerChoiceStrategy =
185 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
186 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
187 this.opts.workerChoiceStrategyOptions =
188 opts.workerChoiceStrategyOptions ??
189 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
190 this.checkValidWorkerChoiceStrategyOptions(
191 this.opts.workerChoiceStrategyOptions
192 )
193 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
194 this.opts.enableEvents = opts.enableEvents ?? true
195 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
196 if (this.opts.enableTasksQueue) {
197 this.checkValidTasksQueueOptions(
198 opts.tasksQueueOptions as TasksQueueOptions
199 )
200 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
201 opts.tasksQueueOptions as TasksQueueOptions
202 )
203 }
204 } else {
205 throw new TypeError('Invalid pool options: must be a plain object')
206 }
207 }
208
209 private checkValidWorkerChoiceStrategy (
210 workerChoiceStrategy: WorkerChoiceStrategy
211 ): void {
212 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
213 throw new Error(
214 `Invalid worker choice strategy '${workerChoiceStrategy}'`
215 )
216 }
217 }
218
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
221 ): void {
222 if (!isPlainObject(workerChoiceStrategyOptions)) {
223 throw new TypeError(
224 'Invalid worker choice strategy options: must be a plain object'
225 )
226 }
227 if (
228 workerChoiceStrategyOptions.weights != null &&
229 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
230 ) {
231 throw new Error(
232 'Invalid worker choice strategy options: must have a weight for each worker node'
233 )
234 }
235 if (
236 workerChoiceStrategyOptions.measurement != null &&
237 !Object.values(Measurements).includes(
238 workerChoiceStrategyOptions.measurement
239 )
240 ) {
241 throw new Error(
242 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
243 )
244 }
245 }
246
247 private checkValidTasksQueueOptions (
248 tasksQueueOptions: TasksQueueOptions
249 ): void {
250 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
251 throw new TypeError('Invalid tasks queue options: must be a plain object')
252 }
253 if (
254 tasksQueueOptions?.concurrency != null &&
255 !Number.isSafeInteger(tasksQueueOptions.concurrency)
256 ) {
257 throw new TypeError(
258 'Invalid worker tasks concurrency: must be an integer'
259 )
260 }
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 tasksQueueOptions.concurrency <= 0
264 ) {
265 throw new Error(
266 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
267 )
268 }
269 }
270
271 /** @inheritDoc */
272 public get info (): PoolInfo {
273 return {
274 version,
275 type: this.type,
276 worker: this.worker,
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
279 minSize: this.minSize,
280 maxSize: this.maxSize,
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 workerNode.usage.tasks.executing === 0
289 ? accumulator + 1
290 : accumulator,
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
296 0
297 ),
298 executedTasks: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
300 accumulator + workerNode.usage.tasks.executed,
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.executing,
306 0
307 ),
308 queuedTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
310 accumulator + workerNode.usage.tasks.queued,
311 0
312 ),
313 maxQueuedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
316 0
317 ),
318 failedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.tasks.failed,
321 0
322 ),
323 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
324 .runTime.aggregate && {
325 runTime: {
326 minimum: round(
327 Math.min(
328 ...this.workerNodes.map(
329 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
330 )
331 )
332 ),
333 maximum: round(
334 Math.max(
335 ...this.workerNodes.map(
336 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
337 )
338 )
339 ),
340 average: round(
341 this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
344 0
345 ) /
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.tasks?.executed ?? 0),
349 0
350 )
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.median && {
354 median: round(
355 median(
356 this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.median ?? 0
358 )
359 )
360 )
361 })
362 }
363 }),
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && {
366 waitTime: {
367 minimum: round(
368 Math.min(
369 ...this.workerNodes.map(
370 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
371 )
372 )
373 ),
374 maximum: round(
375 Math.max(
376 ...this.workerNodes.map(
377 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
378 )
379 )
380 ),
381 average: round(
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
385 0
386 ) /
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.tasks?.executed ?? 0),
390 0
391 )
392 ),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.median && {
395 median: round(
396 median(
397 this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.median ?? 0
399 )
400 )
401 )
402 })
403 }
404 })
405 }
406 }
407
408 private get starting (): boolean {
409 return (
410 this.workerNodes.length < this.minSize ||
411 (this.workerNodes.length >= this.minSize &&
412 this.workerNodes.some(workerNode => !workerNode.info.ready))
413 )
414 }
415
416 private get ready (): boolean {
417 return (
418 this.workerNodes.length >= this.minSize &&
419 this.workerNodes.every(workerNode => workerNode.info.ready)
420 )
421 }
422
423 /**
424 * Gets the approximate pool utilization.
425 *
426 * @returns The pool utilization.
427 */
428 private get utilization (): number {
429 const poolRunTimeCapacity =
430 (performance.now() - this.startTimestamp) * this.maxSize
431 const totalTasksRunTime = this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
434 0
435 )
436 const totalTasksWaitTime = this.workerNodes.reduce(
437 (accumulator, workerNode) =>
438 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
439 0
440 )
441 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
442 }
443
444 /**
445 * Pool type.
446 *
447 * If it is `'dynamic'`, it provides the `max` property.
448 */
449 protected abstract get type (): PoolType
450
451 /**
452 * Gets the worker type.
453 */
454 protected abstract get worker (): WorkerType
455
456 /**
457 * Pool minimum size.
458 */
459 protected abstract get minSize (): number
460
461 /**
462 * Pool maximum size.
463 */
464 protected abstract get maxSize (): number
465
466 /**
467 * Get the worker given its id.
468 *
469 * @param workerId - The worker id.
470 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
471 */
472 private getWorkerById (workerId: number): Worker | undefined {
473 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
474 ?.worker
475 }
476
477 private checkMessageWorkerId (message: MessageValue<Response>): void {
478 if (
479 message.workerId != null &&
480 this.getWorkerById(message.workerId) == null
481 ) {
482 throw new Error(
483 `Worker message received from unknown worker '${message.workerId}'`
484 )
485 }
486 }
487
488 /**
489 * Gets the given worker its worker node key.
490 *
491 * @param worker - The worker.
492 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
493 */
494 private getWorkerNodeKey (worker: Worker): number {
495 return this.workerNodes.findIndex(
496 workerNode => workerNode.worker === worker
497 )
498 }
499
500 /** @inheritDoc */
501 public setWorkerChoiceStrategy (
502 workerChoiceStrategy: WorkerChoiceStrategy,
503 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
504 ): void {
505 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
506 this.opts.workerChoiceStrategy = workerChoiceStrategy
507 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
508 this.opts.workerChoiceStrategy
509 )
510 if (workerChoiceStrategyOptions != null) {
511 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
512 }
513 for (const workerNode of this.workerNodes) {
514 workerNode.resetUsage()
515 this.setWorkerStatistics(workerNode.worker)
516 }
517 }
518
519 /** @inheritDoc */
520 public setWorkerChoiceStrategyOptions (
521 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
522 ): void {
523 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
524 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
525 this.workerChoiceStrategyContext.setOptions(
526 this.opts.workerChoiceStrategyOptions
527 )
528 }
529
530 /** @inheritDoc */
531 public enableTasksQueue (
532 enable: boolean,
533 tasksQueueOptions?: TasksQueueOptions
534 ): void {
535 if (this.opts.enableTasksQueue === true && !enable) {
536 this.flushTasksQueues()
537 }
538 this.opts.enableTasksQueue = enable
539 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
540 }
541
542 /** @inheritDoc */
543 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
544 if (this.opts.enableTasksQueue === true) {
545 this.checkValidTasksQueueOptions(tasksQueueOptions)
546 this.opts.tasksQueueOptions =
547 this.buildTasksQueueOptions(tasksQueueOptions)
548 } else if (this.opts.tasksQueueOptions != null) {
549 delete this.opts.tasksQueueOptions
550 }
551 }
552
553 private buildTasksQueueOptions (
554 tasksQueueOptions: TasksQueueOptions
555 ): TasksQueueOptions {
556 return {
557 concurrency: tasksQueueOptions?.concurrency ?? 1
558 }
559 }
560
561 /**
562 * Whether the pool is full or not.
563 *
564 * The pool filling boolean status.
565 */
566 protected get full (): boolean {
567 return this.workerNodes.length >= this.maxSize
568 }
569
570 /**
571 * Whether the pool is busy or not.
572 *
573 * The pool busyness boolean status.
574 */
575 protected abstract get busy (): boolean
576
577 /**
578 * Whether worker nodes are executing at least one task.
579 *
580 * @returns Worker nodes busyness boolean status.
581 */
582 protected internalBusy (): boolean {
583 return (
584 this.workerNodes.findIndex(workerNode => {
585 return workerNode.usage.tasks.executing === 0
586 }) === -1
587 )
588 }
589
590 /** @inheritDoc */
591 public async execute (data?: Data, name?: string): Promise<Response> {
592 const timestamp = performance.now()
593 const workerNodeKey = this.chooseWorkerNode()
594 const submittedTask: Task<Data> = {
595 name: name ?? DEFAULT_TASK_NAME,
596 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
597 data: data ?? ({} as Data),
598 timestamp,
599 workerId: this.getWorkerInfo(workerNodeKey).id as number,
600 id: randomUUID()
601 }
602 const res = new Promise<Response>((resolve, reject) => {
603 this.promiseResponseMap.set(submittedTask.id as string, {
604 resolve,
605 reject,
606 worker: this.workerNodes[workerNodeKey].worker
607 })
608 })
609 if (
610 this.opts.enableTasksQueue === true &&
611 (this.busy ||
612 this.workerNodes[workerNodeKey].usage.tasks.executing >=
613 ((this.opts.tasksQueueOptions as TasksQueueOptions)
614 .concurrency as number))
615 ) {
616 this.enqueueTask(workerNodeKey, submittedTask)
617 } else {
618 this.executeTask(workerNodeKey, submittedTask)
619 }
620 this.checkAndEmitEvents()
621 // eslint-disable-next-line @typescript-eslint/return-await
622 return res
623 }
624
625 /** @inheritDoc */
626 public async destroy (): Promise<void> {
627 await Promise.all(
628 this.workerNodes.map(async (workerNode, workerNodeKey) => {
629 this.flushTasksQueue(workerNodeKey)
630 // FIXME: wait for tasks to be finished
631 const workerExitPromise = new Promise<void>(resolve => {
632 workerNode.worker.on('exit', () => {
633 resolve()
634 })
635 })
636 await this.destroyWorker(workerNode.worker)
637 await workerExitPromise
638 })
639 )
640 }
641
642 /**
643 * Terminates the given worker.
644 *
645 * @param worker - A worker within `workerNodes`.
646 */
647 protected abstract destroyWorker (worker: Worker): void | Promise<void>
648
649 /**
650 * Setup hook to execute code before worker nodes are created in the abstract constructor.
651 * Can be overridden.
652 *
653 * @virtual
654 */
655 protected setupHook (): void {
656 // Intentionally empty
657 }
658
659 /**
660 * Should return whether the worker is the main worker or not.
661 */
662 protected abstract isMain (): boolean
663
664 /**
665 * Hook executed before the worker task execution.
666 * Can be overridden.
667 *
668 * @param workerNodeKey - The worker node key.
669 * @param task - The task to execute.
670 */
671 protected beforeTaskExecutionHook (
672 workerNodeKey: number,
673 task: Task<Data>
674 ): void {
675 const workerUsage = this.workerNodes[workerNodeKey].usage
676 ++workerUsage.tasks.executing
677 this.updateWaitTimeWorkerUsage(workerUsage, task)
678 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
679 task.name as string
680 ) as WorkerUsage
681 ++taskWorkerUsage.tasks.executing
682 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
683 }
684
685 /**
686 * Hook executed after the worker task execution.
687 * Can be overridden.
688 *
689 * @param worker - The worker.
690 * @param message - The received message.
691 */
692 protected afterTaskExecutionHook (
693 worker: Worker,
694 message: MessageValue<Response>
695 ): void {
696 const workerNodeKey = this.getWorkerNodeKey(worker)
697 const workerUsage = this.workerNodes[workerNodeKey].usage
698 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
699 this.updateRunTimeWorkerUsage(workerUsage, message)
700 this.updateEluWorkerUsage(workerUsage, message)
701 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
702 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
703 ) as WorkerUsage
704 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
705 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
706 this.updateEluWorkerUsage(taskWorkerUsage, message)
707 }
708
709 private updateTaskStatisticsWorkerUsage (
710 workerUsage: WorkerUsage,
711 message: MessageValue<Response>
712 ): void {
713 const workerTaskStatistics = workerUsage.tasks
714 --workerTaskStatistics.executing
715 if (message.taskError == null) {
716 ++workerTaskStatistics.executed
717 } else {
718 ++workerTaskStatistics.failed
719 }
720 }
721
722 private updateRunTimeWorkerUsage (
723 workerUsage: WorkerUsage,
724 message: MessageValue<Response>
725 ): void {
726 if (
727 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
728 .aggregate
729 ) {
730 const taskRunTime = message.taskPerformance?.runTime ?? 0
731 workerUsage.runTime.aggregate =
732 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
733 workerUsage.runTime.minimum = Math.min(
734 taskRunTime,
735 workerUsage.runTime?.minimum ?? Infinity
736 )
737 workerUsage.runTime.maximum = Math.max(
738 taskRunTime,
739 workerUsage.runTime?.maximum ?? -Infinity
740 )
741 if (
742 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
743 .average &&
744 workerUsage.tasks.executed !== 0
745 ) {
746 workerUsage.runTime.average =
747 workerUsage.runTime.aggregate / workerUsage.tasks.executed
748 }
749 if (
750 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
751 .median &&
752 message.taskPerformance?.runTime != null
753 ) {
754 workerUsage.runTime.history.push(message.taskPerformance.runTime)
755 workerUsage.runTime.median = median(workerUsage.runTime.history)
756 }
757 }
758 }
759
760 private updateWaitTimeWorkerUsage (
761 workerUsage: WorkerUsage,
762 task: Task<Data>
763 ): void {
764 const timestamp = performance.now()
765 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
766 if (
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
768 .aggregate
769 ) {
770 workerUsage.waitTime.aggregate =
771 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
772 workerUsage.waitTime.minimum = Math.min(
773 taskWaitTime,
774 workerUsage.waitTime?.minimum ?? Infinity
775 )
776 workerUsage.waitTime.maximum = Math.max(
777 taskWaitTime,
778 workerUsage.waitTime?.maximum ?? -Infinity
779 )
780 if (
781 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
782 .waitTime.average &&
783 workerUsage.tasks.executed !== 0
784 ) {
785 workerUsage.waitTime.average =
786 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
787 }
788 if (
789 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
790 .waitTime.median
791 ) {
792 workerUsage.waitTime.history.push(taskWaitTime)
793 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
794 }
795 }
796 }
797
798 private updateEluWorkerUsage (
799 workerUsage: WorkerUsage,
800 message: MessageValue<Response>
801 ): void {
802 if (
803 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
804 .aggregate
805 ) {
806 if (message.taskPerformance?.elu != null) {
807 workerUsage.elu.idle.aggregate =
808 (workerUsage.elu.idle?.aggregate ?? 0) +
809 message.taskPerformance.elu.idle
810 workerUsage.elu.active.aggregate =
811 (workerUsage.elu.active?.aggregate ?? 0) +
812 message.taskPerformance.elu.active
813 if (workerUsage.elu.utilization != null) {
814 workerUsage.elu.utilization =
815 (workerUsage.elu.utilization +
816 message.taskPerformance.elu.utilization) /
817 2
818 } else {
819 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
820 }
821 workerUsage.elu.idle.minimum = Math.min(
822 message.taskPerformance.elu.idle,
823 workerUsage.elu.idle?.minimum ?? Infinity
824 )
825 workerUsage.elu.idle.maximum = Math.max(
826 message.taskPerformance.elu.idle,
827 workerUsage.elu.idle?.maximum ?? -Infinity
828 )
829 workerUsage.elu.active.minimum = Math.min(
830 message.taskPerformance.elu.active,
831 workerUsage.elu.active?.minimum ?? Infinity
832 )
833 workerUsage.elu.active.maximum = Math.max(
834 message.taskPerformance.elu.active,
835 workerUsage.elu.active?.maximum ?? -Infinity
836 )
837 if (
838 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
839 .average &&
840 workerUsage.tasks.executed !== 0
841 ) {
842 workerUsage.elu.idle.average =
843 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
844 workerUsage.elu.active.average =
845 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
846 }
847 if (
848 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
849 .median
850 ) {
851 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
852 workerUsage.elu.active.history.push(
853 message.taskPerformance.elu.active
854 )
855 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
856 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
857 }
858 }
859 }
860 }
861
862 /**
863 * Chooses a worker node for the next task.
864 *
865 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
866 *
867 * @returns The worker node key
868 */
869 private chooseWorkerNode (): number {
870 if (this.shallCreateDynamicWorker()) {
871 const worker = this.createAndSetupDynamicWorker()
872 if (
873 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
874 ) {
875 return this.getWorkerNodeKey(worker)
876 }
877 }
878 return this.workerChoiceStrategyContext.execute()
879 }
880
881 /**
882 * Conditions for dynamic worker creation.
883 *
884 * @returns Whether to create a dynamic worker or not.
885 */
886 private shallCreateDynamicWorker (): boolean {
887 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
888 }
889
890 /**
891 * Sends a message to the given worker.
892 *
893 * @param worker - The worker which should receive the message.
894 * @param message - The message.
895 */
896 protected abstract sendToWorker (
897 worker: Worker,
898 message: MessageValue<Data>
899 ): void
900
901 /**
902 * Registers a listener callback on the given worker.
903 *
904 * @param worker - The worker which should register a listener.
905 * @param listener - The message listener callback.
906 */
907 private registerWorkerMessageListener<Message extends Data | Response>(
908 worker: Worker,
909 listener: (message: MessageValue<Message>) => void
910 ): void {
911 worker.on('message', listener as MessageHandler<Worker>)
912 }
913
914 /**
915 * Creates a new worker.
916 *
917 * @returns Newly created worker.
918 */
919 protected abstract createWorker (): Worker
920
921 /**
922 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
923 * Can be overridden.
924 *
925 * @param worker - The newly created worker.
926 */
927 protected afterWorkerSetup (worker: Worker): void {
928 // Listen to worker messages.
929 this.registerWorkerMessageListener(worker, this.workerListener())
930 // Send startup message to worker.
931 this.sendToWorker(worker, {
932 ready: false,
933 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
934 })
935 // Setup worker task statistics computation.
936 this.setWorkerStatistics(worker)
937 }
938
939 /**
940 * Creates a new worker and sets it up completely in the pool worker nodes.
941 *
942 * @returns New, completely set up worker.
943 */
944 protected createAndSetupWorker (): Worker {
945 const worker = this.createWorker()
946
947 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
948 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
949 worker.on('error', error => {
950 if (this.emitter != null) {
951 this.emitter.emit(PoolEvents.error, error)
952 }
953 if (this.opts.restartWorkerOnError === true && !this.starting) {
954 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
955 this.createAndSetupDynamicWorker()
956 } else {
957 this.createAndSetupWorker()
958 }
959 }
960 if (this.opts.enableTasksQueue === true) {
961 this.redistributeQueuedTasks(worker)
962 }
963 })
964 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
965 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
966 worker.once('exit', () => {
967 this.removeWorkerNode(worker)
968 })
969
970 this.pushWorkerNode(worker)
971
972 this.afterWorkerSetup(worker)
973
974 return worker
975 }
976
977 private redistributeQueuedTasks (worker: Worker): void {
978 const workerNodeKey = this.getWorkerNodeKey(worker)
979 while (this.tasksQueueSize(workerNodeKey) > 0) {
980 let targetWorkerNodeKey: number = workerNodeKey
981 let minQueuedTasks = Infinity
982 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
983 if (
984 workerNodeId !== workerNodeKey &&
985 workerNode.usage.tasks.queued === 0
986 ) {
987 targetWorkerNodeKey = workerNodeId
988 break
989 }
990 if (
991 workerNodeId !== workerNodeKey &&
992 workerNode.usage.tasks.queued < minQueuedTasks
993 ) {
994 minQueuedTasks = workerNode.usage.tasks.queued
995 targetWorkerNodeKey = workerNodeId
996 }
997 }
998 this.enqueueTask(
999 targetWorkerNodeKey,
1000 this.dequeueTask(workerNodeKey) as Task<Data>
1001 )
1002 }
1003 }
1004
1005 /**
1006 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
1007 *
1008 * @returns New, completely set up dynamic worker.
1009 */
1010 protected createAndSetupDynamicWorker (): Worker {
1011 const worker = this.createAndSetupWorker()
1012 this.registerWorkerMessageListener(worker, message => {
1013 const workerNodeKey = this.getWorkerNodeKey(worker)
1014 if (
1015 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1016 (message.kill != null &&
1017 ((this.opts.enableTasksQueue === false &&
1018 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
1019 (this.opts.enableTasksQueue === true &&
1020 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
1021 this.tasksQueueSize(workerNodeKey) === 0)))
1022 ) {
1023 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
1024 void (this.destroyWorker(worker) as Promise<void>)
1025 }
1026 })
1027 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1028 workerInfo.dynamic = true
1029 this.sendToWorker(worker, {
1030 checkAlive: true,
1031 workerId: workerInfo.id as number
1032 })
1033 return worker
1034 }
1035
1036 /**
1037 * This function is the listener registered for each worker message.
1038 *
1039 * @returns The listener function to execute when a message is received from a worker.
1040 */
1041 protected workerListener (): (message: MessageValue<Response>) => void {
1042 return message => {
1043 this.checkMessageWorkerId(message)
1044 if (message.ready != null && message.workerId != null) {
1045 // Worker ready message received
1046 this.handleWorkerReadyMessage(message)
1047 } else if (message.id != null) {
1048 // Task execution response received
1049 this.handleTaskExecutionResponse(message)
1050 }
1051 }
1052 }
1053
1054 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1055 const worker = this.getWorkerById(message.workerId)
1056 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1057 message.ready as boolean
1058 if (this.emitter != null && this.ready) {
1059 this.emitter.emit(PoolEvents.ready, this.info)
1060 }
1061 }
1062
1063 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1064 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1065 if (promiseResponse != null) {
1066 if (message.taskError != null) {
1067 if (this.emitter != null) {
1068 this.emitter.emit(PoolEvents.taskError, message.taskError)
1069 }
1070 promiseResponse.reject(message.taskError.message)
1071 } else {
1072 promiseResponse.resolve(message.data as Response)
1073 }
1074 this.afterTaskExecutionHook(promiseResponse.worker, message)
1075 this.promiseResponseMap.delete(message.id as string)
1076 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1077 if (
1078 this.opts.enableTasksQueue === true &&
1079 this.tasksQueueSize(workerNodeKey) > 0
1080 ) {
1081 this.executeTask(
1082 workerNodeKey,
1083 this.dequeueTask(workerNodeKey) as Task<Data>
1084 )
1085 }
1086 this.workerChoiceStrategyContext.update(workerNodeKey)
1087 }
1088 }
1089
1090 private checkAndEmitEvents (): void {
1091 if (this.emitter != null) {
1092 if (this.busy) {
1093 this.emitter.emit(PoolEvents.busy, this.info)
1094 }
1095 if (this.type === PoolTypes.dynamic && this.full) {
1096 this.emitter.emit(PoolEvents.full, this.info)
1097 }
1098 }
1099 }
1100
1101 /**
1102 * Gets the worker information.
1103 *
1104 * @param workerNodeKey - The worker node key.
1105 */
1106 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1107 return this.workerNodes[workerNodeKey].info
1108 }
1109
1110 /**
1111 * Pushes the given worker in the pool worker nodes.
1112 *
1113 * @param worker - The worker.
1114 * @returns The worker nodes length.
1115 */
1116 private pushWorkerNode (worker: Worker): number {
1117 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1118 }
1119
1120 /**
1121 * Removes the given worker from the pool worker nodes.
1122 *
1123 * @param worker - The worker.
1124 */
1125 private removeWorkerNode (worker: Worker): void {
1126 const workerNodeKey = this.getWorkerNodeKey(worker)
1127 if (workerNodeKey !== -1) {
1128 this.workerNodes.splice(workerNodeKey, 1)
1129 this.workerChoiceStrategyContext.remove(workerNodeKey)
1130 }
1131 }
1132
1133 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1134 this.beforeTaskExecutionHook(workerNodeKey, task)
1135 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1136 }
1137
1138 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1139 return this.workerNodes[workerNodeKey].enqueueTask(task)
1140 }
1141
1142 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1143 return this.workerNodes[workerNodeKey].dequeueTask()
1144 }
1145
1146 private tasksQueueSize (workerNodeKey: number): number {
1147 return this.workerNodes[workerNodeKey].tasksQueueSize()
1148 }
1149
1150 private flushTasksQueue (workerNodeKey: number): void {
1151 while (this.tasksQueueSize(workerNodeKey) > 0) {
1152 this.executeTask(
1153 workerNodeKey,
1154 this.dequeueTask(workerNodeKey) as Task<Data>
1155 )
1156 }
1157 this.workerNodes[workerNodeKey].clearTasksQueue()
1158 }
1159
1160 private flushTasksQueues (): void {
1161 for (const [workerNodeKey] of this.workerNodes.entries()) {
1162 this.flushTasksQueue(workerNodeKey)
1163 }
1164 }
1165
1166 private setWorkerStatistics (worker: Worker): void {
1167 this.sendToWorker(worker, {
1168 statistics: {
1169 runTime:
1170 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1171 .runTime.aggregate,
1172 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1173 .elu.aggregate
1174 },
1175 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1176 })
1177 }
1178 }