refactor: factor out code to redistribute queued tasks
[poolifier.git] / src / pools / abstract-pool.ts
1 import { randomUUID } from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerInfo,
32 WorkerNode,
33 WorkerUsage
34 } from './worker'
35 import {
36 Measurements,
37 WorkerChoiceStrategies,
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
40 } from './selection-strategies/selection-strategies-types'
41 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
42 import { version } from './version'
43
44 /**
45 * Base class that implements some shared logic for all poolifier pools.
46 *
47 * @typeParam Worker - Type of worker which manages this pool.
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
50 */
51 export abstract class AbstractPool<
52 Worker extends IWorker,
53 Data = unknown,
54 Response = unknown
55 > implements IPool<Worker, Data, Response> {
56 /** @inheritDoc */
57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
58
59 /** @inheritDoc */
60 public readonly emitter?: PoolEmitter
61
62 /**
63 * The execution response promise map.
64 *
65 * - `key`: The message id of each submitted task.
66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
67 *
68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
69 */
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
74
75 /**
76 * Worker choice strategy context referencing a worker choice algorithm implementation.
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
79 Worker,
80 Data,
81 Response
82 >
83
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
89 /**
90 * Constructs a new poolifier pool.
91 *
92 * @param numberOfWorkers - Number of workers that this pool should manage.
93 * @param filePath - Path to the worker file.
94 * @param opts - Options for the pool.
95 */
96 public constructor (
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
100 ) {
101 if (!this.isMain()) {
102 throw new Error('Cannot start a pool from a worker!')
103 }
104 this.checkNumberOfWorkers(this.numberOfWorkers)
105 this.checkFilePath(this.filePath)
106 this.checkPoolOptions(this.opts)
107
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
112
113 if (this.opts.enableEvents === true) {
114 this.emitter = new PoolEmitter()
115 }
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
125
126 this.setupHook()
127
128 while (this.workerNodes.length < this.numberOfWorkers) {
129 this.createAndSetupWorker()
130 }
131
132 this.startTimestamp = performance.now()
133 }
134
135 private checkFilePath (filePath: string): void {
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
150 throw new TypeError(
151 'Cannot instantiate a pool with a non safe integer number of workers'
152 )
153 } else if (numberOfWorkers < 0) {
154 throw new RangeError(
155 'Cannot instantiate a pool with a negative number of workers'
156 )
157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
186 }
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
193 throw new Error(
194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
195 )
196 }
197 }
198
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
207 if (
208 workerChoiceStrategyOptions.weights != null &&
209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
225 }
226
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
245 throw new Error(
246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
247 )
248 }
249 }
250
251 /** @inheritDoc */
252 public get info (): PoolInfo {
253 return {
254 version,
255 type: this.type,
256 worker: this.worker,
257 minSize: this.minSize,
258 maxSize: this.maxSize,
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 workerNode.usage.tasks.executing === 0
267 ? accumulator + 1
268 : accumulator,
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
274 0
275 ),
276 executedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 accumulator + workerNode.usage.tasks.executed,
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.usage.tasks.executing,
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
288 accumulator + workerNode.usage.tasks.queued,
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 accumulator + workerNode.usage.tasks.maxQueued,
294 0
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
298 accumulator + workerNode.usage.tasks.failed,
299 0
300 ),
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate && {
303 runTime: {
304 minimum: round(
305 Math.min(
306 ...this.workerNodes.map(
307 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
308 )
309 )
310 ),
311 maximum: round(
312 Math.max(
313 ...this.workerNodes.map(
314 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
315 )
316 )
317 ),
318 average: round(
319 this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
322 0
323 ) /
324 this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + (workerNode.usage.tasks?.executed ?? 0),
327 0
328 )
329 ),
330 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
331 .runTime.median && {
332 median: round(
333 median(
334 this.workerNodes.map(
335 workerNode => workerNode.usage.runTime?.median ?? 0
336 )
337 )
338 )
339 })
340 }
341 }),
342 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
343 .waitTime.aggregate && {
344 waitTime: {
345 minimum: round(
346 Math.min(
347 ...this.workerNodes.map(
348 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
349 )
350 )
351 ),
352 maximum: round(
353 Math.max(
354 ...this.workerNodes.map(
355 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
356 )
357 )
358 ),
359 average: round(
360 this.workerNodes.reduce(
361 (accumulator, workerNode) =>
362 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
363 0
364 ) /
365 this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 accumulator + (workerNode.usage.tasks?.executed ?? 0),
368 0
369 )
370 ),
371 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
372 .waitTime.median && {
373 median: round(
374 median(
375 this.workerNodes.map(
376 workerNode => workerNode.usage.waitTime?.median ?? 0
377 )
378 )
379 )
380 })
381 }
382 })
383 }
384 }
385
386 /**
387 * Gets the approximate pool utilization.
388 *
389 * @returns The pool utilization.
390 */
391 private get utilization (): number {
392 const poolRunTimeCapacity =
393 (performance.now() - this.startTimestamp) * this.maxSize
394 const totalTasksRunTime = this.workerNodes.reduce(
395 (accumulator, workerNode) =>
396 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
397 0
398 )
399 const totalTasksWaitTime = this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
402 0
403 )
404 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
405 }
406
407 /**
408 * Pool type.
409 *
410 * If it is `'dynamic'`, it provides the `max` property.
411 */
412 protected abstract get type (): PoolType
413
414 /**
415 * Gets the worker type.
416 */
417 protected abstract get worker (): WorkerType
418
419 /**
420 * Pool minimum size.
421 */
422 protected abstract get minSize (): number
423
424 /**
425 * Pool maximum size.
426 */
427 protected abstract get maxSize (): number
428
429 /**
430 * Get the worker given its id.
431 *
432 * @param workerId - The worker id.
433 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
434 */
435 private getWorkerById (workerId: number): Worker | undefined {
436 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
437 ?.worker
438 }
439
440 /**
441 * Gets the given worker its worker node key.
442 *
443 * @param worker - The worker.
444 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
445 */
446 private getWorkerNodeKey (worker: Worker): number {
447 return this.workerNodes.findIndex(
448 workerNode => workerNode.worker === worker
449 )
450 }
451
452 /** @inheritDoc */
453 public setWorkerChoiceStrategy (
454 workerChoiceStrategy: WorkerChoiceStrategy,
455 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
456 ): void {
457 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
458 this.opts.workerChoiceStrategy = workerChoiceStrategy
459 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
460 this.opts.workerChoiceStrategy
461 )
462 if (workerChoiceStrategyOptions != null) {
463 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
464 }
465 for (const workerNode of this.workerNodes) {
466 this.setWorkerNodeTasksUsage(
467 workerNode,
468 this.getInitialWorkerUsage(workerNode.worker)
469 )
470 this.setWorkerStatistics(workerNode.worker)
471 }
472 }
473
474 /** @inheritDoc */
475 public setWorkerChoiceStrategyOptions (
476 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
477 ): void {
478 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
479 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
480 this.workerChoiceStrategyContext.setOptions(
481 this.opts.workerChoiceStrategyOptions
482 )
483 }
484
485 /** @inheritDoc */
486 public enableTasksQueue (
487 enable: boolean,
488 tasksQueueOptions?: TasksQueueOptions
489 ): void {
490 if (this.opts.enableTasksQueue === true && !enable) {
491 this.flushTasksQueues()
492 }
493 this.opts.enableTasksQueue = enable
494 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
495 }
496
497 /** @inheritDoc */
498 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
499 if (this.opts.enableTasksQueue === true) {
500 this.checkValidTasksQueueOptions(tasksQueueOptions)
501 this.opts.tasksQueueOptions =
502 this.buildTasksQueueOptions(tasksQueueOptions)
503 } else if (this.opts.tasksQueueOptions != null) {
504 delete this.opts.tasksQueueOptions
505 }
506 }
507
508 private buildTasksQueueOptions (
509 tasksQueueOptions: TasksQueueOptions
510 ): TasksQueueOptions {
511 return {
512 concurrency: tasksQueueOptions?.concurrency ?? 1
513 }
514 }
515
516 /**
517 * Whether the pool is full or not.
518 *
519 * The pool filling boolean status.
520 */
521 protected get full (): boolean {
522 return this.workerNodes.length >= this.maxSize
523 }
524
525 /**
526 * Whether the pool is busy or not.
527 *
528 * The pool busyness boolean status.
529 */
530 protected abstract get busy (): boolean
531
532 /**
533 * Whether worker nodes are executing at least one task.
534 *
535 * @returns Worker nodes busyness boolean status.
536 */
537 protected internalBusy (): boolean {
538 return (
539 this.workerNodes.findIndex(workerNode => {
540 return workerNode.usage.tasks.executing === 0
541 }) === -1
542 )
543 }
544
545 /** @inheritDoc */
546 public async execute (data?: Data, name?: string): Promise<Response> {
547 const timestamp = performance.now()
548 const workerNodeKey = this.chooseWorkerNode()
549 const submittedTask: Task<Data> = {
550 name,
551 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
552 data: data ?? ({} as Data),
553 timestamp,
554 id: randomUUID()
555 }
556 const res = new Promise<Response>((resolve, reject) => {
557 this.promiseResponseMap.set(submittedTask.id as string, {
558 resolve,
559 reject,
560 worker: this.workerNodes[workerNodeKey].worker
561 })
562 })
563 if (
564 this.opts.enableTasksQueue === true &&
565 (this.busy ||
566 this.workerNodes[workerNodeKey].usage.tasks.executing >=
567 ((this.opts.tasksQueueOptions as TasksQueueOptions)
568 .concurrency as number))
569 ) {
570 this.enqueueTask(workerNodeKey, submittedTask)
571 } else {
572 this.executeTask(workerNodeKey, submittedTask)
573 }
574 this.checkAndEmitEvents()
575 // eslint-disable-next-line @typescript-eslint/return-await
576 return res
577 }
578
579 /** @inheritDoc */
580 public async destroy (): Promise<void> {
581 await Promise.all(
582 this.workerNodes.map(async (workerNode, workerNodeKey) => {
583 this.flushTasksQueue(workerNodeKey)
584 // FIXME: wait for tasks to be finished
585 const workerExitPromise = new Promise<void>(resolve => {
586 workerNode.worker.on('exit', () => {
587 resolve()
588 })
589 })
590 await this.destroyWorker(workerNode.worker)
591 await workerExitPromise
592 })
593 )
594 }
595
596 /**
597 * Terminates the given worker.
598 *
599 * @param worker - A worker within `workerNodes`.
600 */
601 protected abstract destroyWorker (worker: Worker): void | Promise<void>
602
603 /**
604 * Setup hook to execute code before worker nodes are created in the abstract constructor.
605 * Can be overridden.
606 *
607 * @virtual
608 */
609 protected setupHook (): void {
610 // Intentionally empty
611 }
612
613 /**
614 * Should return whether the worker is the main worker or not.
615 */
616 protected abstract isMain (): boolean
617
618 /**
619 * Hook executed before the worker task execution.
620 * Can be overridden.
621 *
622 * @param workerNodeKey - The worker node key.
623 * @param task - The task to execute.
624 */
625 protected beforeTaskExecutionHook (
626 workerNodeKey: number,
627 task: Task<Data>
628 ): void {
629 const workerUsage = this.workerNodes[workerNodeKey].usage
630 ++workerUsage.tasks.executing
631 this.updateWaitTimeWorkerUsage(workerUsage, task)
632 }
633
634 /**
635 * Hook executed after the worker task execution.
636 * Can be overridden.
637 *
638 * @param worker - The worker.
639 * @param message - The received message.
640 */
641 protected afterTaskExecutionHook (
642 worker: Worker,
643 message: MessageValue<Response>
644 ): void {
645 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
646 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
647 this.updateRunTimeWorkerUsage(workerUsage, message)
648 this.updateEluWorkerUsage(workerUsage, message)
649 }
650
651 private updateTaskStatisticsWorkerUsage (
652 workerUsage: WorkerUsage,
653 message: MessageValue<Response>
654 ): void {
655 const workerTaskStatistics = workerUsage.tasks
656 --workerTaskStatistics.executing
657 if (message.taskError == null) {
658 ++workerTaskStatistics.executed
659 } else {
660 ++workerTaskStatistics.failed
661 }
662 }
663
664 private updateRunTimeWorkerUsage (
665 workerUsage: WorkerUsage,
666 message: MessageValue<Response>
667 ): void {
668 if (
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
670 .aggregate
671 ) {
672 const taskRunTime = message.taskPerformance?.runTime ?? 0
673 workerUsage.runTime.aggregate =
674 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
675 workerUsage.runTime.minimum = Math.min(
676 taskRunTime,
677 workerUsage.runTime?.minimum ?? Infinity
678 )
679 workerUsage.runTime.maximum = Math.max(
680 taskRunTime,
681 workerUsage.runTime?.maximum ?? -Infinity
682 )
683 if (
684 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
685 .average &&
686 workerUsage.tasks.executed !== 0
687 ) {
688 workerUsage.runTime.average =
689 workerUsage.runTime.aggregate / workerUsage.tasks.executed
690 }
691 if (
692 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
693 .median &&
694 message.taskPerformance?.runTime != null
695 ) {
696 workerUsage.runTime.history.push(message.taskPerformance.runTime)
697 workerUsage.runTime.median = median(workerUsage.runTime.history)
698 }
699 }
700 }
701
702 private updateWaitTimeWorkerUsage (
703 workerUsage: WorkerUsage,
704 task: Task<Data>
705 ): void {
706 const timestamp = performance.now()
707 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
708 if (
709 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
710 .aggregate
711 ) {
712 workerUsage.waitTime.aggregate =
713 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
714 workerUsage.waitTime.minimum = Math.min(
715 taskWaitTime,
716 workerUsage.waitTime?.minimum ?? Infinity
717 )
718 workerUsage.waitTime.maximum = Math.max(
719 taskWaitTime,
720 workerUsage.waitTime?.maximum ?? -Infinity
721 )
722 if (
723 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
724 .waitTime.average &&
725 workerUsage.tasks.executed !== 0
726 ) {
727 workerUsage.waitTime.average =
728 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
729 }
730 if (
731 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
732 .waitTime.median
733 ) {
734 workerUsage.waitTime.history.push(taskWaitTime)
735 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
736 }
737 }
738 }
739
740 private updateEluWorkerUsage (
741 workerUsage: WorkerUsage,
742 message: MessageValue<Response>
743 ): void {
744 if (
745 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
746 .aggregate
747 ) {
748 if (message.taskPerformance?.elu != null) {
749 workerUsage.elu.idle.aggregate =
750 (workerUsage.elu.idle?.aggregate ?? 0) +
751 message.taskPerformance.elu.idle
752 workerUsage.elu.active.aggregate =
753 (workerUsage.elu.active?.aggregate ?? 0) +
754 message.taskPerformance.elu.active
755 if (workerUsage.elu.utilization != null) {
756 workerUsage.elu.utilization =
757 (workerUsage.elu.utilization +
758 message.taskPerformance.elu.utilization) /
759 2
760 } else {
761 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
762 }
763 workerUsage.elu.idle.minimum = Math.min(
764 message.taskPerformance.elu.idle,
765 workerUsage.elu.idle?.minimum ?? Infinity
766 )
767 workerUsage.elu.idle.maximum = Math.max(
768 message.taskPerformance.elu.idle,
769 workerUsage.elu.idle?.maximum ?? -Infinity
770 )
771 workerUsage.elu.active.minimum = Math.min(
772 message.taskPerformance.elu.active,
773 workerUsage.elu.active?.minimum ?? Infinity
774 )
775 workerUsage.elu.active.maximum = Math.max(
776 message.taskPerformance.elu.active,
777 workerUsage.elu.active?.maximum ?? -Infinity
778 )
779 if (
780 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
781 .average &&
782 workerUsage.tasks.executed !== 0
783 ) {
784 workerUsage.elu.idle.average =
785 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
786 workerUsage.elu.active.average =
787 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
788 }
789 if (
790 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
791 .median
792 ) {
793 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
794 workerUsage.elu.active.history.push(
795 message.taskPerformance.elu.active
796 )
797 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
798 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
799 }
800 }
801 }
802 }
803
804 /**
805 * Chooses a worker node for the next task.
806 *
807 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
808 *
809 * @returns The worker node key
810 */
811 private chooseWorkerNode (): number {
812 if (this.shallCreateDynamicWorker()) {
813 const worker = this.createAndSetupDynamicWorker()
814 if (
815 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
816 ) {
817 return this.getWorkerNodeKey(worker)
818 }
819 }
820 return this.workerChoiceStrategyContext.execute()
821 }
822
823 /**
824 * Conditions for dynamic worker creation.
825 *
826 * @returns Whether to create a dynamic worker or not.
827 */
828 private shallCreateDynamicWorker (): boolean {
829 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
830 }
831
832 /**
833 * Sends a message to the given worker.
834 *
835 * @param worker - The worker which should receive the message.
836 * @param message - The message.
837 */
838 protected abstract sendToWorker (
839 worker: Worker,
840 message: MessageValue<Data>
841 ): void
842
843 /**
844 * Registers a listener callback on the given worker.
845 *
846 * @param worker - The worker which should register a listener.
847 * @param listener - The message listener callback.
848 */
849 private registerWorkerMessageListener<Message extends Data | Response>(
850 worker: Worker,
851 listener: (message: MessageValue<Message>) => void
852 ): void {
853 worker.on('message', listener as MessageHandler<Worker>)
854 }
855
856 /**
857 * Creates a new worker.
858 *
859 * @returns Newly created worker.
860 */
861 protected abstract createWorker (): Worker
862
863 /**
864 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
865 * Can be overridden.
866 *
867 * @param worker - The newly created worker.
868 */
869 protected afterWorkerSetup (worker: Worker): void {
870 // Listen to worker messages.
871 this.registerWorkerMessageListener(worker, this.workerListener())
872 }
873
874 /**
875 * Creates a new worker and sets it up completely in the pool worker nodes.
876 *
877 * @returns New, completely set up worker.
878 */
879 protected createAndSetupWorker (): Worker {
880 const worker = this.createWorker()
881
882 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
883 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
884 worker.on('error', error => {
885 if (this.emitter != null) {
886 this.emitter.emit(PoolEvents.error, error)
887 }
888 if (this.opts.enableTasksQueue === true) {
889 this.redistributeQueuedTasks(worker)
890 }
891 if (this.opts.restartWorkerOnError === true) {
892 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
893 this.createAndSetupDynamicWorker()
894 } else {
895 this.createAndSetupWorker()
896 }
897 }
898 })
899 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
900 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
901 worker.once('exit', () => {
902 this.removeWorkerNode(worker)
903 })
904
905 this.pushWorkerNode(worker)
906
907 this.setWorkerStatistics(worker)
908
909 this.afterWorkerSetup(worker)
910
911 return worker
912 }
913
914 private redistributeQueuedTasks (worker: Worker): void {
915 const workerNodeKey = this.getWorkerNodeKey(worker)
916 while (this.tasksQueueSize(workerNodeKey) > 0) {
917 let targetWorkerNodeKey: number = workerNodeKey
918 let minQueuedTasks = Infinity
919 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
920 if (
921 workerNodeId !== workerNodeKey &&
922 workerNode.usage.tasks.queued === 0
923 ) {
924 targetWorkerNodeKey = workerNodeId
925 break
926 }
927 if (
928 workerNodeId !== workerNodeKey &&
929 workerNode.usage.tasks.queued < minQueuedTasks
930 ) {
931 minQueuedTasks = workerNode.usage.tasks.queued
932 targetWorkerNodeKey = workerNodeId
933 }
934 }
935 this.enqueueTask(
936 targetWorkerNodeKey,
937 this.dequeueTask(workerNodeKey) as Task<Data>
938 )
939 }
940 }
941
942 /**
943 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
944 *
945 * @returns New, completely set up dynamic worker.
946 */
947 protected createAndSetupDynamicWorker (): Worker {
948 const worker = this.createAndSetupWorker()
949 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
950 this.registerWorkerMessageListener(worker, message => {
951 const workerNodeKey = this.getWorkerNodeKey(worker)
952 if (
953 isKillBehavior(KillBehaviors.HARD, message.kill) ||
954 (message.kill != null &&
955 ((this.opts.enableTasksQueue === false &&
956 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
957 (this.opts.enableTasksQueue === true &&
958 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
959 this.tasksQueueSize(workerNodeKey) === 0)))
960 ) {
961 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
962 void (this.destroyWorker(worker) as Promise<void>)
963 }
964 })
965 return worker
966 }
967
968 /**
969 * This function is the listener registered for each worker message.
970 *
971 * @returns The listener function to execute when a message is received from a worker.
972 */
973 protected workerListener (): (message: MessageValue<Response>) => void {
974 return message => {
975 if (message.workerId != null && message.started != null) {
976 // Worker started message received
977 this.handleWorkerStartedMessage(message)
978 } else if (message.id != null) {
979 // Task execution response received
980 this.handleTaskExecutionResponse(message)
981 }
982 }
983 }
984
985 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
986 // Worker started message received
987 const worker = this.getWorkerById(message.workerId as number)
988 if (worker != null) {
989 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
990 message.started as boolean
991 } else {
992 throw new Error(
993 `Worker started message received from unknown worker '${
994 message.workerId as number
995 }'`
996 )
997 }
998 }
999
1000 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1001 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1002 if (promiseResponse != null) {
1003 if (message.taskError != null) {
1004 if (this.emitter != null) {
1005 this.emitter.emit(PoolEvents.taskError, message.taskError)
1006 }
1007 promiseResponse.reject(message.taskError.message)
1008 } else {
1009 promiseResponse.resolve(message.data as Response)
1010 }
1011 this.afterTaskExecutionHook(promiseResponse.worker, message)
1012 this.promiseResponseMap.delete(message.id as string)
1013 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1014 if (
1015 this.opts.enableTasksQueue === true &&
1016 this.tasksQueueSize(workerNodeKey) > 0
1017 ) {
1018 this.executeTask(
1019 workerNodeKey,
1020 this.dequeueTask(workerNodeKey) as Task<Data>
1021 )
1022 }
1023 this.workerChoiceStrategyContext.update(workerNodeKey)
1024 }
1025 }
1026
1027 private checkAndEmitEvents (): void {
1028 if (this.emitter != null) {
1029 if (this.busy) {
1030 this.emitter.emit(PoolEvents.busy, this.info)
1031 }
1032 if (this.type === PoolTypes.dynamic && this.full) {
1033 this.emitter.emit(PoolEvents.full, this.info)
1034 }
1035 }
1036 }
1037
1038 /**
1039 * Sets the given worker node its tasks usage in the pool.
1040 *
1041 * @param workerNode - The worker node.
1042 * @param workerUsage - The worker usage.
1043 */
1044 private setWorkerNodeTasksUsage (
1045 workerNode: WorkerNode<Worker, Data>,
1046 workerUsage: WorkerUsage
1047 ): void {
1048 workerNode.usage = workerUsage
1049 }
1050
1051 /**
1052 * Gets the worker information.
1053 *
1054 * @param workerNodeKey - The worker node key.
1055 */
1056 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1057 return this.workerNodes[workerNodeKey].info
1058 }
1059
1060 /**
1061 * Pushes the given worker in the pool worker nodes.
1062 *
1063 * @param worker - The worker.
1064 * @returns The worker nodes length.
1065 */
1066 private pushWorkerNode (worker: Worker): number {
1067 this.workerNodes.push({
1068 worker,
1069 info: this.getInitialWorkerInfo(worker),
1070 usage: this.getInitialWorkerUsage(),
1071 tasksQueue: new Queue<Task<Data>>()
1072 })
1073 this.setWorkerNodeTasksUsage(
1074 this.workerNodes[this.getWorkerNodeKey(worker)],
1075 this.getInitialWorkerUsage(worker)
1076 )
1077 return this.workerNodes.length
1078 }
1079
1080 /**
1081 * Gets the worker id.
1082 *
1083 * @param worker - The worker.
1084 * @returns The worker id.
1085 */
1086 private getWorkerId (worker: Worker): number | undefined {
1087 if (this.worker === WorkerTypes.thread) {
1088 return worker.threadId
1089 } else if (this.worker === WorkerTypes.cluster) {
1090 return worker.id
1091 }
1092 }
1093
1094 // /**
1095 // * Sets the given worker in the pool worker nodes.
1096 // *
1097 // * @param workerNodeKey - The worker node key.
1098 // * @param worker - The worker.
1099 // * @param workerInfo - The worker info.
1100 // * @param workerUsage - The worker usage.
1101 // * @param tasksQueue - The worker task queue.
1102 // */
1103 // private setWorkerNode (
1104 // workerNodeKey: number,
1105 // worker: Worker,
1106 // workerInfo: WorkerInfo,
1107 // workerUsage: WorkerUsage,
1108 // tasksQueue: Queue<Task<Data>>
1109 // ): void {
1110 // this.workerNodes[workerNodeKey] = {
1111 // worker,
1112 // info: workerInfo,
1113 // usage: workerUsage,
1114 // tasksQueue
1115 // }
1116 // }
1117
1118 /**
1119 * Removes the given worker from the pool worker nodes.
1120 *
1121 * @param worker - The worker.
1122 */
1123 private removeWorkerNode (worker: Worker): void {
1124 const workerNodeKey = this.getWorkerNodeKey(worker)
1125 if (workerNodeKey !== -1) {
1126 this.workerNodes.splice(workerNodeKey, 1)
1127 this.workerChoiceStrategyContext.remove(workerNodeKey)
1128 }
1129 }
1130
1131 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1132 this.beforeTaskExecutionHook(workerNodeKey, task)
1133 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1134 }
1135
1136 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1137 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1138 }
1139
1140 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1141 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1142 }
1143
1144 private tasksQueueSize (workerNodeKey: number): number {
1145 return this.workerNodes[workerNodeKey].tasksQueue.size
1146 }
1147
1148 private tasksMaxQueueSize (workerNodeKey: number): number {
1149 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1150 }
1151
1152 private flushTasksQueue (workerNodeKey: number): void {
1153 while (this.tasksQueueSize(workerNodeKey) > 0) {
1154 this.executeTask(
1155 workerNodeKey,
1156 this.dequeueTask(workerNodeKey) as Task<Data>
1157 )
1158 }
1159 this.workerNodes[workerNodeKey].tasksQueue.clear()
1160 }
1161
1162 private flushTasksQueues (): void {
1163 for (const [workerNodeKey] of this.workerNodes.entries()) {
1164 this.flushTasksQueue(workerNodeKey)
1165 }
1166 }
1167
1168 private setWorkerStatistics (worker: Worker): void {
1169 this.sendToWorker(worker, {
1170 statistics: {
1171 runTime:
1172 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1173 .runTime.aggregate,
1174 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1175 .elu.aggregate
1176 }
1177 })
1178 }
1179
1180 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1181 const getTasksQueueSize = (worker?: Worker): number => {
1182 if (worker == null) {
1183 return 0
1184 }
1185 return this.tasksQueueSize(this.getWorkerNodeKey(worker))
1186 }
1187 const getTasksMaxQueueSize = (worker?: Worker): number => {
1188 if (worker == null) {
1189 return 0
1190 }
1191 return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1192 }
1193 return {
1194 tasks: {
1195 executed: 0,
1196 executing: 0,
1197 get queued (): number {
1198 return getTasksQueueSize(worker)
1199 },
1200 get maxQueued (): number {
1201 return getTasksMaxQueueSize(worker)
1202 },
1203 failed: 0
1204 },
1205 runTime: {
1206 history: new CircularArray()
1207 },
1208 waitTime: {
1209 history: new CircularArray()
1210 },
1211 elu: {
1212 idle: {
1213 history: new CircularArray()
1214 },
1215 active: {
1216 history: new CircularArray()
1217 }
1218 }
1219 }
1220 }
1221
1222 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1223 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1224 }
1225 }