WRR: Fix worker choice initial runtime value on each round
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
b3432a63 1import { cpus } from 'os'
b3432a63 2import type { IPoolInternal } from '../pool-internal'
ea7a90d3 3import type { IPoolWorker } from '../pool-worker'
b3432a63 4import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
10fcfaf4 5import type { RequiredStatistics } from './selection-strategies-types'
b3432a63
JB
6
7/**
23135a89 8 * Virtual task runtime.
b3432a63
JB
9 */
10type TaskRunTime = {
11 weight: number
12 runTime: number
13}
14
15/**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
22 */
23export class WeightedRoundRobinWorkerChoiceStrategy<
ea7a90d3 24 Worker extends IPoolWorker,
b3432a63
JB
25 Data,
26 Response
27> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
10fcfaf4 28 /** @inheritDoc */
ea7a90d3 29 public readonly requiredStatistics: RequiredStatistics = {
10fcfaf4
JB
30 runTime: true
31 }
32
b3432a63
JB
33 /**
34 * Worker index where the current task will be submitted.
35 */
36 private currentWorkerIndex: number = 0
37 /**
38 * Default worker weight.
39 */
777af0ac 40 private readonly defaultWorkerWeight: number
b3432a63 41 /**
2377984d 42 * Per worker virtual task runtime map.
b3432a63 43 */
ea7a90d3 44 private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
b3432a63
JB
45 Worker,
46 TaskRunTime
47 >()
48
49 /**
23ff945a 50 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
b3432a63
JB
51 *
52 * @param pool The pool instance.
53 */
54 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
55 super(pool)
56 this.defaultWorkerWeight = this.computeWorkerWeight()
2377984d 57 this.initWorkersTaskRunTime()
b3432a63
JB
58 }
59
ea7a90d3 60 /** @inheritDoc */
a6f7f1b4 61 public reset (): boolean {
a6f7f1b4 62 this.currentWorkerIndex = 0
ea7a90d3
JB
63 this.workersTaskRunTime.clear()
64 this.initWorkersTaskRunTime()
65 return true
66 }
67
b3432a63
JB
68 /** @inheritDoc */
69 public choose (): Worker {
d8a610ca 70 const chosenWorker = this.pool.workers[this.currentWorkerIndex]
2377984d
JB
71 if (
72 this.isDynamicPool === true &&
11df3590 73 this.workersTaskRunTime.has(chosenWorker) === false
2377984d 74 ) {
11df3590 75 this.initWorkerTaskRunTime(chosenWorker)
b3432a63 76 }
d8a610ca
JB
77 const workerTaskRunTime =
78 this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0
b3432a63 79 const workerTaskWeight =
11df3590 80 this.workersTaskRunTime.get(chosenWorker)?.weight ??
b3432a63 81 this.defaultWorkerWeight
553ad720 82 if (workerTaskRunTime < workerTaskWeight) {
2377984d 83 this.setWorkerTaskRunTime(
11df3590 84 chosenWorker,
2377984d 85 workerTaskWeight,
553ad720 86 workerTaskRunTime +
11df3590 87 (this.getWorkerVirtualTaskRunTime(chosenWorker) ?? 0)
2377984d 88 )
b3432a63 89 } else {
b3432a63 90 this.currentWorkerIndex =
553ad720 91 this.currentWorkerIndex === this.pool.workers.length - 1
b3432a63
JB
92 ? 0
93 : this.currentWorkerIndex + 1
d354916a
JB
94 this.setWorkerTaskRunTime(
95 this.pool.workers[this.currentWorkerIndex],
96 workerTaskWeight,
97 0
98 )
b3432a63 99 }
11df3590 100 return chosenWorker
b3432a63
JB
101 }
102
2377984d
JB
103 private initWorkersTaskRunTime (): void {
104 for (const worker of this.pool.workers) {
105 this.initWorkerTaskRunTime(worker)
106 }
107 }
108
109 private initWorkerTaskRunTime (worker: Worker): void {
110 this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
111 }
112
113 private setWorkerTaskRunTime (
114 worker: Worker,
115 weight: number,
116 runTime: number
117 ): void {
118 this.workersTaskRunTime.set(worker, {
119 weight,
120 runTime
121 })
122 }
123
124 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
125 return this.pool.getWorkerAverageTasksRunTime(worker)
126 }
127
128 private computeWorkerWeight (): number {
b3432a63 129 let cpusCycleTimeWeight = 0
a59e741b 130 for (const cpu of cpus()) {
b3432a63 131 // CPU estimated cycle time
d8a610ca
JB
132 const numberOfDigits = cpu.speed.toString().length - 1
133 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
134 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
b3432a63 135 }
7b0d35b8 136 return Math.round(cpusCycleTimeWeight / cpus().length)
b3432a63 137 }
b3432a63 138}