ed8690918f6177656020d84a915f11de85023aca
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'os'
2 import type { AbstractPoolWorker } from '../abstract-pool-worker'
3 import type { IPoolInternal } from '../pool-internal'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type { RequiredStatistics } from './selection-strategies-types'
6
7 /**
8 * Virtual task runtime.
9 */
10 type TaskRunTime = {
11 weight: number
12 runTime: number
13 }
14
15 /**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
22 */
23 export class WeightedRoundRobinWorkerChoiceStrategy<
24 Worker extends AbstractPoolWorker,
25 Data,
26 Response
27 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
28 /** @inheritDoc */
29 public requiredStatistics: RequiredStatistics = {
30 runTime: true
31 }
32
33 /**
34 * Worker index where the previous task was submitted.
35 */
36 private previousWorkerIndex: number = 0
37 /**
38 * Worker index where the current task will be submitted.
39 */
40 private currentWorkerIndex: number = 0
41 /**
42 * Default worker weight.
43 */
44 private defaultWorkerWeight: number
45 /**
46 * Per worker virtual task runtime map.
47 */
48 private workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
49 Worker,
50 TaskRunTime
51 >()
52
53 /**
54 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
55 *
56 * @param pool The pool instance.
57 */
58 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
59 super(pool)
60 this.defaultWorkerWeight = this.computeWorkerWeight()
61 this.initWorkersTaskRunTime()
62 }
63
64 /** @inheritDoc */
65 public choose (): Worker {
66 const currentWorker = this.pool.workers[this.currentWorkerIndex]
67 if (
68 this.isDynamicPool === true &&
69 this.workersTaskRunTime.has(currentWorker) === false
70 ) {
71 this.initWorkerTaskRunTime(currentWorker)
72 }
73 const workerVirtualTaskRunTime =
74 this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
75 const workerTaskWeight =
76 this.workersTaskRunTime.get(currentWorker)?.weight ??
77 this.defaultWorkerWeight
78 if (this.currentWorkerIndex === this.previousWorkerIndex) {
79 const workerTaskRunTime =
80 (this.workersTaskRunTime.get(currentWorker)?.runTime ?? 0) +
81 workerVirtualTaskRunTime
82 this.setWorkerTaskRunTime(
83 currentWorker,
84 workerTaskWeight,
85 workerTaskRunTime
86 )
87 } else {
88 this.setWorkerTaskRunTime(currentWorker, workerTaskWeight, 0)
89 }
90 if (workerVirtualTaskRunTime < workerTaskWeight) {
91 this.previousWorkerIndex = this.currentWorkerIndex
92 } else {
93 this.previousWorkerIndex = this.currentWorkerIndex
94 this.currentWorkerIndex =
95 this.pool.workers.length - 1 === this.currentWorkerIndex
96 ? 0
97 : this.currentWorkerIndex + 1
98 }
99 return this.pool.workers[this.currentWorkerIndex]
100 }
101
102 private initWorkersTaskRunTime (): void {
103 for (const worker of this.pool.workers) {
104 this.initWorkerTaskRunTime(worker)
105 }
106 }
107
108 private initWorkerTaskRunTime (worker: Worker): void {
109 this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
110 }
111
112 private setWorkerTaskRunTime (
113 worker: Worker,
114 weight: number,
115 runTime: number
116 ): void {
117 this.workersTaskRunTime.set(worker, {
118 weight,
119 runTime
120 })
121 }
122
123 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
124 return this.pool.getWorkerAverageTasksRunTime(worker)
125 }
126
127 private computeWorkerWeight (): number {
128 let cpusCycleTimeWeight = 0
129 for (const cpu of cpus()) {
130 // CPU estimated cycle time
131 const numberOfDigit = cpu.speed.toString().length - 1
132 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigit))
133 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
134 }
135 return Math.round(cpusCycleTimeWeight / cpus().length)
136 }
137 }