Tests: assess internal strategy statistics resets at changing it
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'os'
2 import type { IPoolInternal } from '../pool-internal'
3 import type { IPoolWorker } from '../pool-worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type { RequiredStatistics } from './selection-strategies-types'
6
7 /**
8 * Virtual task runtime.
9 */
10 type TaskRunTime = {
11 weight: number
12 runTime: number
13 }
14
15 /**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
22 */
23 export class WeightedRoundRobinWorkerChoiceStrategy<
24 Worker extends IPoolWorker,
25 Data,
26 Response
27 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
28 /** @inheritDoc */
29 public readonly requiredStatistics: RequiredStatistics = {
30 runTime: true
31 }
32
33 /**
34 * Worker index where the previous task was submitted.
35 */
36 private previousWorkerIndex: number = 0
37 /**
38 * Worker index where the current task will be submitted.
39 */
40 private currentWorkerIndex: number = 0
41 /**
42 * Default worker weight.
43 */
44 private defaultWorkerWeight: number
45 /**
46 * Per worker virtual task runtime map.
47 */
48 private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
49 Worker,
50 TaskRunTime
51 >()
52
53 /**
54 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
55 *
56 * @param pool The pool instance.
57 */
58 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
59 super(pool)
60 this.defaultWorkerWeight = this.computeWorkerWeight()
61 this.initWorkersTaskRunTime()
62 }
63
64 /** @inheritDoc */
65 public resetStatistics (): boolean {
66 this.workersTaskRunTime.clear()
67 this.initWorkersTaskRunTime()
68 return true
69 }
70
71 /** @inheritDoc */
72 public choose (): Worker {
73 const currentWorker = this.pool.workers[this.currentWorkerIndex]
74 if (
75 this.isDynamicPool === true &&
76 this.workersTaskRunTime.has(currentWorker) === false
77 ) {
78 this.initWorkerTaskRunTime(currentWorker)
79 }
80 const workerVirtualTaskRunTime =
81 this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
82 const workerTaskWeight =
83 this.workersTaskRunTime.get(currentWorker)?.weight ??
84 this.defaultWorkerWeight
85 if (this.currentWorkerIndex === this.previousWorkerIndex) {
86 const workerTaskRunTime =
87 (this.workersTaskRunTime.get(currentWorker)?.runTime ?? 0) +
88 workerVirtualTaskRunTime
89 this.setWorkerTaskRunTime(
90 currentWorker,
91 workerTaskWeight,
92 workerTaskRunTime
93 )
94 } else {
95 this.setWorkerTaskRunTime(currentWorker, workerTaskWeight, 0)
96 }
97 if (workerVirtualTaskRunTime < workerTaskWeight) {
98 this.previousWorkerIndex = this.currentWorkerIndex
99 } else {
100 this.previousWorkerIndex = this.currentWorkerIndex
101 this.currentWorkerIndex =
102 this.pool.workers.length - 1 === this.currentWorkerIndex
103 ? 0
104 : this.currentWorkerIndex + 1
105 }
106 return this.pool.workers[this.currentWorkerIndex]
107 }
108
109 private initWorkersTaskRunTime (): void {
110 for (const worker of this.pool.workers) {
111 this.initWorkerTaskRunTime(worker)
112 }
113 }
114
115 private initWorkerTaskRunTime (worker: Worker): void {
116 this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
117 }
118
119 private setWorkerTaskRunTime (
120 worker: Worker,
121 weight: number,
122 runTime: number
123 ): void {
124 this.workersTaskRunTime.set(worker, {
125 weight,
126 runTime
127 })
128 }
129
130 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
131 return this.pool.getWorkerAverageTasksRunTime(worker)
132 }
133
134 private computeWorkerWeight (): number {
135 let cpusCycleTimeWeight = 0
136 for (const cpu of cpus()) {
137 // CPU estimated cycle time
138 const numberOfDigit = cpu.speed.toString().length - 1
139 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigit))
140 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
141 }
142 return Math.round(cpusCycleTimeWeight / cpus().length)
143 }
144 }