Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import { randomUUID } from 'node:crypto'
2import { performance } from 'node:perf_hooks'
3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11} from '../utils'
12import { KillBehaviors } from '../worker/worker-options'
13import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions
22} from './pool'
23import type {
24 IWorker,
25 IWorkerNode,
26 MessageHandler,
27 Task,
28 WorkerInfo,
29 WorkerType,
30 WorkerUsage
31} from './worker'
32import {
33 Measurements,
34 WorkerChoiceStrategies,
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
39import { version } from './version'
40import { WorkerNode } from './worker-node'
41
42/**
43 * Base class that implements some shared logic for all poolifier pools.
44 *
45 * @typeParam Worker - Type of worker which manages this pool.
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 */
49export abstract class AbstractPool<
50 Worker extends IWorker,
51 Data = unknown,
52 Response = unknown
53> implements IPool<Worker, Data, Response> {
54 /** @inheritDoc */
55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
56
57 /** @inheritDoc */
58 public readonly emitter?: PoolEmitter
59
60 /**
61 * The execution response promise map.
62 *
63 * - `key`: The message id of each submitted task.
64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 *
66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 */
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
72
73 /**
74 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
77 Worker,
78 Data,
79 Response
80 >
81
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
87 /**
88 * Constructs a new poolifier pool.
89 *
90 * @param numberOfWorkers - Number of workers that this pool should manage.
91 * @param filePath - Path to the worker file.
92 * @param opts - Options for the pool.
93 */
94 public constructor (
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
98 ) {
99 if (!this.isMain()) {
100 throw new Error('Cannot start a pool from a worker!')
101 }
102 this.checkNumberOfWorkers(this.numberOfWorkers)
103 this.checkFilePath(this.filePath)
104 this.checkPoolOptions(this.opts)
105
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
110
111 if (this.opts.enableEvents === true) {
112 this.emitter = new PoolEmitter()
113 }
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
123
124 this.setupHook()
125
126 while (this.workerNodes.length < this.numberOfWorkers) {
127 this.createAndSetupWorker()
128 }
129
130 this.startTimestamp = performance.now()
131 }
132
133 private checkFilePath (filePath: string): void {
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
148 throw new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 } else if (numberOfWorkers < 0) {
152 throw new RangeError(
153 'Cannot instantiate a pool with a negative number of workers'
154 )
155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
161 if (this.type === PoolTypes.dynamic) {
162 if (min > max) {
163 throw new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 } else if (min === 0 && max === 0) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
169 )
170 } else if (min === max) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
173 )
174 }
175 }
176 }
177
178 private checkPoolOptions (opts: PoolOptions<Worker>): void {
179 if (isPlainObject(opts)) {
180 this.opts.workerChoiceStrategy =
181 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
182 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
183 this.opts.workerChoiceStrategyOptions =
184 opts.workerChoiceStrategyOptions ??
185 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
186 this.checkValidWorkerChoiceStrategyOptions(
187 this.opts.workerChoiceStrategyOptions
188 )
189 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
190 this.opts.enableEvents = opts.enableEvents ?? true
191 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
192 if (this.opts.enableTasksQueue) {
193 this.checkValidTasksQueueOptions(
194 opts.tasksQueueOptions as TasksQueueOptions
195 )
196 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
197 opts.tasksQueueOptions as TasksQueueOptions
198 )
199 }
200 } else {
201 throw new TypeError('Invalid pool options: must be a plain object')
202 }
203 }
204
205 private checkValidWorkerChoiceStrategy (
206 workerChoiceStrategy: WorkerChoiceStrategy
207 ): void {
208 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
209 throw new Error(
210 `Invalid worker choice strategy '${workerChoiceStrategy}'`
211 )
212 }
213 }
214
215 private checkValidWorkerChoiceStrategyOptions (
216 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
217 ): void {
218 if (!isPlainObject(workerChoiceStrategyOptions)) {
219 throw new TypeError(
220 'Invalid worker choice strategy options: must be a plain object'
221 )
222 }
223 if (
224 workerChoiceStrategyOptions.weights != null &&
225 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
226 ) {
227 throw new Error(
228 'Invalid worker choice strategy options: must have a weight for each worker node'
229 )
230 }
231 if (
232 workerChoiceStrategyOptions.measurement != null &&
233 !Object.values(Measurements).includes(
234 workerChoiceStrategyOptions.measurement
235 )
236 ) {
237 throw new Error(
238 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
239 )
240 }
241 }
242
243 private checkValidTasksQueueOptions (
244 tasksQueueOptions: TasksQueueOptions
245 ): void {
246 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
247 throw new TypeError('Invalid tasks queue options: must be a plain object')
248 }
249 if (
250 tasksQueueOptions?.concurrency != null &&
251 !Number.isSafeInteger(tasksQueueOptions.concurrency)
252 ) {
253 throw new TypeError(
254 'Invalid worker tasks concurrency: must be an integer'
255 )
256 }
257 if (
258 tasksQueueOptions?.concurrency != null &&
259 tasksQueueOptions.concurrency <= 0
260 ) {
261 throw new Error(
262 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
263 )
264 }
265 }
266
267 /** @inheritDoc */
268 public get info (): PoolInfo {
269 return {
270 version,
271 type: this.type,
272 worker: this.worker,
273 ready: this.ready,
274 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
275 minSize: this.minSize,
276 maxSize: this.maxSize,
277 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .runTime.aggregate &&
279 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
280 .waitTime.aggregate && { utilization: round(this.utilization) }),
281 workerNodes: this.workerNodes.length,
282 idleWorkerNodes: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 workerNode.usage.tasks.executing === 0
285 ? accumulator + 1
286 : accumulator,
287 0
288 ),
289 busyWorkerNodes: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
291 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
292 0
293 ),
294 executedTasks: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
296 accumulator + workerNode.usage.tasks.executed,
297 0
298 ),
299 executingTasks: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
301 accumulator + workerNode.usage.tasks.executing,
302 0
303 ),
304 queuedTasks: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 accumulator + workerNode.usage.tasks.queued,
307 0
308 ),
309 maxQueuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.maxQueued,
312 0
313 ),
314 failedTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.tasks.failed,
317 0
318 ),
319 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .runTime.aggregate && {
321 runTime: {
322 minimum: round(
323 Math.min(
324 ...this.workerNodes.map(
325 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
326 )
327 )
328 ),
329 maximum: round(
330 Math.max(
331 ...this.workerNodes.map(
332 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
333 )
334 )
335 ),
336 average: round(
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
340 0
341 ) /
342 this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.tasks?.executed ?? 0),
345 0
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.median && {
350 median: round(
351 median(
352 this.workerNodes.map(
353 workerNode => workerNode.usage.runTime?.median ?? 0
354 )
355 )
356 )
357 })
358 }
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && {
362 waitTime: {
363 minimum: round(
364 Math.min(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
367 )
368 )
369 ),
370 maximum: round(
371 Math.max(
372 ...this.workerNodes.map(
373 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
374 )
375 )
376 ),
377 average: round(
378 this.workerNodes.reduce(
379 (accumulator, workerNode) =>
380 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
381 0
382 ) /
383 this.workerNodes.reduce(
384 (accumulator, workerNode) =>
385 accumulator + (workerNode.usage.tasks?.executed ?? 0),
386 0
387 )
388 ),
389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
390 .waitTime.median && {
391 median: round(
392 median(
393 this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.median ?? 0
395 )
396 )
397 )
398 })
399 }
400 })
401 }
402 }
403
404 private get starting (): boolean {
405 return (
406 !this.full ||
407 (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
408 )
409 }
410
411 private get ready (): boolean {
412 return (
413 this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
414 )
415 }
416
417 /**
418 * Gets the approximate pool utilization.
419 *
420 * @returns The pool utilization.
421 */
422 private get utilization (): number {
423 const poolRunTimeCapacity =
424 (performance.now() - this.startTimestamp) * this.maxSize
425 const totalTasksRunTime = this.workerNodes.reduce(
426 (accumulator, workerNode) =>
427 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
428 0
429 )
430 const totalTasksWaitTime = this.workerNodes.reduce(
431 (accumulator, workerNode) =>
432 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
433 0
434 )
435 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
436 }
437
438 /**
439 * Pool type.
440 *
441 * If it is `'dynamic'`, it provides the `max` property.
442 */
443 protected abstract get type (): PoolType
444
445 /**
446 * Gets the worker type.
447 */
448 protected abstract get worker (): WorkerType
449
450 /**
451 * Pool minimum size.
452 */
453 protected abstract get minSize (): number
454
455 /**
456 * Pool maximum size.
457 */
458 protected abstract get maxSize (): number
459
460 /**
461 * Get the worker given its id.
462 *
463 * @param workerId - The worker id.
464 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
465 */
466 private getWorkerById (workerId: number): Worker | undefined {
467 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
468 ?.worker
469 }
470
471 private checkMessageWorkerId (message: MessageValue<Response>): void {
472 if (
473 message.workerId != null &&
474 this.getWorkerById(message.workerId) == null
475 ) {
476 throw new Error(
477 `Worker message received from unknown worker '${message.workerId}'`
478 )
479 }
480 }
481
482 /**
483 * Gets the given worker its worker node key.
484 *
485 * @param worker - The worker.
486 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
487 */
488 private getWorkerNodeKey (worker: Worker): number {
489 return this.workerNodes.findIndex(
490 workerNode => workerNode.worker === worker
491 )
492 }
493
494 /** @inheritDoc */
495 public setWorkerChoiceStrategy (
496 workerChoiceStrategy: WorkerChoiceStrategy,
497 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
498 ): void {
499 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
500 this.opts.workerChoiceStrategy = workerChoiceStrategy
501 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
502 this.opts.workerChoiceStrategy
503 )
504 if (workerChoiceStrategyOptions != null) {
505 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
506 }
507 for (const workerNode of this.workerNodes) {
508 workerNode.resetUsage()
509 this.setWorkerStatistics(workerNode.worker)
510 }
511 }
512
513 /** @inheritDoc */
514 public setWorkerChoiceStrategyOptions (
515 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
516 ): void {
517 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
518 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
519 this.workerChoiceStrategyContext.setOptions(
520 this.opts.workerChoiceStrategyOptions
521 )
522 }
523
524 /** @inheritDoc */
525 public enableTasksQueue (
526 enable: boolean,
527 tasksQueueOptions?: TasksQueueOptions
528 ): void {
529 if (this.opts.enableTasksQueue === true && !enable) {
530 this.flushTasksQueues()
531 }
532 this.opts.enableTasksQueue = enable
533 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
534 }
535
536 /** @inheritDoc */
537 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
538 if (this.opts.enableTasksQueue === true) {
539 this.checkValidTasksQueueOptions(tasksQueueOptions)
540 this.opts.tasksQueueOptions =
541 this.buildTasksQueueOptions(tasksQueueOptions)
542 } else if (this.opts.tasksQueueOptions != null) {
543 delete this.opts.tasksQueueOptions
544 }
545 }
546
547 private buildTasksQueueOptions (
548 tasksQueueOptions: TasksQueueOptions
549 ): TasksQueueOptions {
550 return {
551 concurrency: tasksQueueOptions?.concurrency ?? 1
552 }
553 }
554
555 /**
556 * Whether the pool is full or not.
557 *
558 * The pool filling boolean status.
559 */
560 protected get full (): boolean {
561 return this.workerNodes.length >= this.maxSize
562 }
563
564 /**
565 * Whether the pool is busy or not.
566 *
567 * The pool busyness boolean status.
568 */
569 protected abstract get busy (): boolean
570
571 /**
572 * Whether worker nodes are executing at least one task.
573 *
574 * @returns Worker nodes busyness boolean status.
575 */
576 protected internalBusy (): boolean {
577 return (
578 this.workerNodes.findIndex(workerNode => {
579 return workerNode.usage.tasks.executing === 0
580 }) === -1
581 )
582 }
583
584 /** @inheritDoc */
585 public async execute (data?: Data, name?: string): Promise<Response> {
586 const timestamp = performance.now()
587 const workerNodeKey = this.chooseWorkerNode()
588 const submittedTask: Task<Data> = {
589 name,
590 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
591 data: data ?? ({} as Data),
592 timestamp,
593 workerId: this.getWorkerInfo(workerNodeKey).id as number,
594 id: randomUUID()
595 }
596 const res = new Promise<Response>((resolve, reject) => {
597 this.promiseResponseMap.set(submittedTask.id as string, {
598 resolve,
599 reject,
600 worker: this.workerNodes[workerNodeKey].worker
601 })
602 })
603 if (
604 this.opts.enableTasksQueue === true &&
605 (this.busy ||
606 this.workerNodes[workerNodeKey].usage.tasks.executing >=
607 ((this.opts.tasksQueueOptions as TasksQueueOptions)
608 .concurrency as number))
609 ) {
610 this.enqueueTask(workerNodeKey, submittedTask)
611 } else {
612 this.executeTask(workerNodeKey, submittedTask)
613 }
614 this.checkAndEmitEvents()
615 // eslint-disable-next-line @typescript-eslint/return-await
616 return res
617 }
618
619 /** @inheritDoc */
620 public async destroy (): Promise<void> {
621 await Promise.all(
622 this.workerNodes.map(async (workerNode, workerNodeKey) => {
623 this.flushTasksQueue(workerNodeKey)
624 // FIXME: wait for tasks to be finished
625 const workerExitPromise = new Promise<void>(resolve => {
626 workerNode.worker.on('exit', () => {
627 resolve()
628 })
629 })
630 await this.destroyWorker(workerNode.worker)
631 await workerExitPromise
632 })
633 )
634 }
635
636 /**
637 * Terminates the given worker.
638 *
639 * @param worker - A worker within `workerNodes`.
640 */
641 protected abstract destroyWorker (worker: Worker): void | Promise<void>
642
643 /**
644 * Setup hook to execute code before worker nodes are created in the abstract constructor.
645 * Can be overridden.
646 *
647 * @virtual
648 */
649 protected setupHook (): void {
650 // Intentionally empty
651 }
652
653 /**
654 * Should return whether the worker is the main worker or not.
655 */
656 protected abstract isMain (): boolean
657
658 /**
659 * Hook executed before the worker task execution.
660 * Can be overridden.
661 *
662 * @param workerNodeKey - The worker node key.
663 * @param task - The task to execute.
664 */
665 protected beforeTaskExecutionHook (
666 workerNodeKey: number,
667 task: Task<Data>
668 ): void {
669 const workerUsage = this.workerNodes[workerNodeKey].usage
670 ++workerUsage.tasks.executing
671 this.updateWaitTimeWorkerUsage(workerUsage, task)
672 }
673
674 /**
675 * Hook executed after the worker task execution.
676 * Can be overridden.
677 *
678 * @param worker - The worker.
679 * @param message - The received message.
680 */
681 protected afterTaskExecutionHook (
682 worker: Worker,
683 message: MessageValue<Response>
684 ): void {
685 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
686 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
687 this.updateRunTimeWorkerUsage(workerUsage, message)
688 this.updateEluWorkerUsage(workerUsage, message)
689 }
690
691 private updateTaskStatisticsWorkerUsage (
692 workerUsage: WorkerUsage,
693 message: MessageValue<Response>
694 ): void {
695 const workerTaskStatistics = workerUsage.tasks
696 --workerTaskStatistics.executing
697 if (message.taskError == null) {
698 ++workerTaskStatistics.executed
699 } else {
700 ++workerTaskStatistics.failed
701 }
702 }
703
704 private updateRunTimeWorkerUsage (
705 workerUsage: WorkerUsage,
706 message: MessageValue<Response>
707 ): void {
708 if (
709 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
710 .aggregate
711 ) {
712 const taskRunTime = message.taskPerformance?.runTime ?? 0
713 workerUsage.runTime.aggregate =
714 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
715 workerUsage.runTime.minimum = Math.min(
716 taskRunTime,
717 workerUsage.runTime?.minimum ?? Infinity
718 )
719 workerUsage.runTime.maximum = Math.max(
720 taskRunTime,
721 workerUsage.runTime?.maximum ?? -Infinity
722 )
723 if (
724 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
725 .average &&
726 workerUsage.tasks.executed !== 0
727 ) {
728 workerUsage.runTime.average =
729 workerUsage.runTime.aggregate / workerUsage.tasks.executed
730 }
731 if (
732 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
733 .median &&
734 message.taskPerformance?.runTime != null
735 ) {
736 workerUsage.runTime.history.push(message.taskPerformance.runTime)
737 workerUsage.runTime.median = median(workerUsage.runTime.history)
738 }
739 }
740 }
741
742 private updateWaitTimeWorkerUsage (
743 workerUsage: WorkerUsage,
744 task: Task<Data>
745 ): void {
746 const timestamp = performance.now()
747 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
748 if (
749 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
750 .aggregate
751 ) {
752 workerUsage.waitTime.aggregate =
753 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
754 workerUsage.waitTime.minimum = Math.min(
755 taskWaitTime,
756 workerUsage.waitTime?.minimum ?? Infinity
757 )
758 workerUsage.waitTime.maximum = Math.max(
759 taskWaitTime,
760 workerUsage.waitTime?.maximum ?? -Infinity
761 )
762 if (
763 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
764 .waitTime.average &&
765 workerUsage.tasks.executed !== 0
766 ) {
767 workerUsage.waitTime.average =
768 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
769 }
770 if (
771 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
772 .waitTime.median
773 ) {
774 workerUsage.waitTime.history.push(taskWaitTime)
775 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
776 }
777 }
778 }
779
780 private updateEluWorkerUsage (
781 workerUsage: WorkerUsage,
782 message: MessageValue<Response>
783 ): void {
784 if (
785 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
786 .aggregate
787 ) {
788 if (message.taskPerformance?.elu != null) {
789 workerUsage.elu.idle.aggregate =
790 (workerUsage.elu.idle?.aggregate ?? 0) +
791 message.taskPerformance.elu.idle
792 workerUsage.elu.active.aggregate =
793 (workerUsage.elu.active?.aggregate ?? 0) +
794 message.taskPerformance.elu.active
795 if (workerUsage.elu.utilization != null) {
796 workerUsage.elu.utilization =
797 (workerUsage.elu.utilization +
798 message.taskPerformance.elu.utilization) /
799 2
800 } else {
801 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
802 }
803 workerUsage.elu.idle.minimum = Math.min(
804 message.taskPerformance.elu.idle,
805 workerUsage.elu.idle?.minimum ?? Infinity
806 )
807 workerUsage.elu.idle.maximum = Math.max(
808 message.taskPerformance.elu.idle,
809 workerUsage.elu.idle?.maximum ?? -Infinity
810 )
811 workerUsage.elu.active.minimum = Math.min(
812 message.taskPerformance.elu.active,
813 workerUsage.elu.active?.minimum ?? Infinity
814 )
815 workerUsage.elu.active.maximum = Math.max(
816 message.taskPerformance.elu.active,
817 workerUsage.elu.active?.maximum ?? -Infinity
818 )
819 if (
820 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
821 .average &&
822 workerUsage.tasks.executed !== 0
823 ) {
824 workerUsage.elu.idle.average =
825 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
826 workerUsage.elu.active.average =
827 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
828 }
829 if (
830 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
831 .median
832 ) {
833 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
834 workerUsage.elu.active.history.push(
835 message.taskPerformance.elu.active
836 )
837 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
838 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
839 }
840 }
841 }
842 }
843
844 /**
845 * Chooses a worker node for the next task.
846 *
847 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
848 *
849 * @returns The worker node key
850 */
851 private chooseWorkerNode (): number {
852 if (this.shallCreateDynamicWorker()) {
853 const worker = this.createAndSetupDynamicWorker()
854 if (
855 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
856 ) {
857 return this.getWorkerNodeKey(worker)
858 }
859 }
860 return this.workerChoiceStrategyContext.execute()
861 }
862
863 /**
864 * Conditions for dynamic worker creation.
865 *
866 * @returns Whether to create a dynamic worker or not.
867 */
868 private shallCreateDynamicWorker (): boolean {
869 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
870 }
871
872 /**
873 * Sends a message to the given worker.
874 *
875 * @param worker - The worker which should receive the message.
876 * @param message - The message.
877 */
878 protected abstract sendToWorker (
879 worker: Worker,
880 message: MessageValue<Data>
881 ): void
882
883 /**
884 * Registers a listener callback on the given worker.
885 *
886 * @param worker - The worker which should register a listener.
887 * @param listener - The message listener callback.
888 */
889 private registerWorkerMessageListener<Message extends Data | Response>(
890 worker: Worker,
891 listener: (message: MessageValue<Message>) => void
892 ): void {
893 worker.on('message', listener as MessageHandler<Worker>)
894 }
895
896 /**
897 * Creates a new worker.
898 *
899 * @returns Newly created worker.
900 */
901 protected abstract createWorker (): Worker
902
903 /**
904 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
905 * Can be overridden.
906 *
907 * @param worker - The newly created worker.
908 */
909 protected afterWorkerSetup (worker: Worker): void {
910 // Listen to worker messages.
911 this.registerWorkerMessageListener(worker, this.workerListener())
912 // Send startup message to worker.
913 this.sendToWorker(worker, {
914 ready: false,
915 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
916 })
917 // Setup worker task statistics computation.
918 this.setWorkerStatistics(worker)
919 }
920
921 /**
922 * Creates a new worker and sets it up completely in the pool worker nodes.
923 *
924 * @returns New, completely set up worker.
925 */
926 protected createAndSetupWorker (): Worker {
927 const worker = this.createWorker()
928
929 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
930 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
931 worker.on('error', error => {
932 if (this.emitter != null) {
933 this.emitter.emit(PoolEvents.error, error)
934 }
935 if (this.opts.enableTasksQueue === true) {
936 this.redistributeQueuedTasks(worker)
937 }
938 if (this.opts.restartWorkerOnError === true && !this.starting) {
939 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
940 this.createAndSetupDynamicWorker()
941 } else {
942 this.createAndSetupWorker()
943 }
944 }
945 })
946 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
947 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
948 worker.once('exit', () => {
949 this.removeWorkerNode(worker)
950 })
951
952 this.pushWorkerNode(worker)
953
954 this.afterWorkerSetup(worker)
955
956 return worker
957 }
958
959 private redistributeQueuedTasks (worker: Worker): void {
960 const workerNodeKey = this.getWorkerNodeKey(worker)
961 while (this.tasksQueueSize(workerNodeKey) > 0) {
962 let targetWorkerNodeKey: number = workerNodeKey
963 let minQueuedTasks = Infinity
964 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
965 if (
966 workerNodeId !== workerNodeKey &&
967 workerNode.usage.tasks.queued === 0
968 ) {
969 targetWorkerNodeKey = workerNodeId
970 break
971 }
972 if (
973 workerNodeId !== workerNodeKey &&
974 workerNode.usage.tasks.queued < minQueuedTasks
975 ) {
976 minQueuedTasks = workerNode.usage.tasks.queued
977 targetWorkerNodeKey = workerNodeId
978 }
979 }
980 this.enqueueTask(
981 targetWorkerNodeKey,
982 this.dequeueTask(workerNodeKey) as Task<Data>
983 )
984 }
985 }
986
987 /**
988 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
989 *
990 * @returns New, completely set up dynamic worker.
991 */
992 protected createAndSetupDynamicWorker (): Worker {
993 const worker = this.createAndSetupWorker()
994 this.registerWorkerMessageListener(worker, message => {
995 const workerNodeKey = this.getWorkerNodeKey(worker)
996 if (
997 isKillBehavior(KillBehaviors.HARD, message.kill) ||
998 (message.kill != null &&
999 ((this.opts.enableTasksQueue === false &&
1000 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
1001 (this.opts.enableTasksQueue === true &&
1002 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
1003 this.tasksQueueSize(workerNodeKey) === 0)))
1004 ) {
1005 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
1006 void (this.destroyWorker(worker) as Promise<void>)
1007 }
1008 })
1009 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1010 workerInfo.dynamic = true
1011 this.sendToWorker(worker, {
1012 checkAlive: true,
1013 workerId: workerInfo.id as number
1014 })
1015 return worker
1016 }
1017
1018 /**
1019 * This function is the listener registered for each worker message.
1020 *
1021 * @returns The listener function to execute when a message is received from a worker.
1022 */
1023 protected workerListener (): (message: MessageValue<Response>) => void {
1024 return message => {
1025 this.checkMessageWorkerId(message)
1026 if (message.ready != null && message.workerId != null) {
1027 // Worker ready message received
1028 this.handleWorkerReadyMessage(message)
1029 } else if (message.id != null) {
1030 // Task execution response received
1031 this.handleTaskExecutionResponse(message)
1032 }
1033 }
1034 }
1035
1036 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
1037 const worker = this.getWorkerById(message.workerId)
1038 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1039 message.ready as boolean
1040 if (this.emitter != null && this.ready) {
1041 this.emitter.emit(PoolEvents.ready, this.info)
1042 }
1043 }
1044
1045 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1046 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1047 if (promiseResponse != null) {
1048 if (message.taskError != null) {
1049 if (this.emitter != null) {
1050 this.emitter.emit(PoolEvents.taskError, message.taskError)
1051 }
1052 promiseResponse.reject(message.taskError.message)
1053 } else {
1054 promiseResponse.resolve(message.data as Response)
1055 }
1056 this.afterTaskExecutionHook(promiseResponse.worker, message)
1057 this.promiseResponseMap.delete(message.id as string)
1058 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1059 if (
1060 this.opts.enableTasksQueue === true &&
1061 this.tasksQueueSize(workerNodeKey) > 0
1062 ) {
1063 this.executeTask(
1064 workerNodeKey,
1065 this.dequeueTask(workerNodeKey) as Task<Data>
1066 )
1067 }
1068 this.workerChoiceStrategyContext.update(workerNodeKey)
1069 }
1070 }
1071
1072 private checkAndEmitEvents (): void {
1073 if (this.emitter != null) {
1074 if (this.busy) {
1075 this.emitter.emit(PoolEvents.busy, this.info)
1076 }
1077 if (this.type === PoolTypes.dynamic && this.full) {
1078 this.emitter.emit(PoolEvents.full, this.info)
1079 }
1080 }
1081 }
1082
1083 /**
1084 * Gets the worker information.
1085 *
1086 * @param workerNodeKey - The worker node key.
1087 */
1088 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1089 return this.workerNodes[workerNodeKey].info
1090 }
1091
1092 /**
1093 * Pushes the given worker in the pool worker nodes.
1094 *
1095 * @param worker - The worker.
1096 * @returns The worker nodes length.
1097 */
1098 private pushWorkerNode (worker: Worker): number {
1099 return this.workerNodes.push(new WorkerNode(worker, this.worker))
1100 }
1101
1102 /**
1103 * Removes the given worker from the pool worker nodes.
1104 *
1105 * @param worker - The worker.
1106 */
1107 private removeWorkerNode (worker: Worker): void {
1108 const workerNodeKey = this.getWorkerNodeKey(worker)
1109 if (workerNodeKey !== -1) {
1110 this.workerNodes.splice(workerNodeKey, 1)
1111 this.workerChoiceStrategyContext.remove(workerNodeKey)
1112 }
1113 }
1114
1115 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1116 this.beforeTaskExecutionHook(workerNodeKey, task)
1117 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1118 }
1119
1120 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1121 return this.workerNodes[workerNodeKey].enqueueTask(task)
1122 }
1123
1124 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1125 return this.workerNodes[workerNodeKey].dequeueTask()
1126 }
1127
1128 private tasksQueueSize (workerNodeKey: number): number {
1129 return this.workerNodes[workerNodeKey].tasksQueueSize()
1130 }
1131
1132 private flushTasksQueue (workerNodeKey: number): void {
1133 while (this.tasksQueueSize(workerNodeKey) > 0) {
1134 this.executeTask(
1135 workerNodeKey,
1136 this.dequeueTask(workerNodeKey) as Task<Data>
1137 )
1138 }
1139 this.workerNodes[workerNodeKey].clearTasksQueue()
1140 }
1141
1142 private flushTasksQueues (): void {
1143 for (const [workerNodeKey] of this.workerNodes.entries()) {
1144 this.flushTasksQueue(workerNodeKey)
1145 }
1146 }
1147
1148 private setWorkerStatistics (worker: Worker): void {
1149 this.sendToWorker(worker, {
1150 statistics: {
1151 runTime:
1152 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1153 .runTime.aggregate,
1154 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1155 .elu.aggregate
1156 },
1157 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1158 })
1159 }
1160}