Flag some attributes as readonly
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
b3432a63 1import { cpus } from 'os'
b3432a63 2import type { IPoolInternal } from '../pool-internal'
ea7a90d3 3import type { IPoolWorker } from '../pool-worker'
b3432a63 4import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
10fcfaf4 5import type { RequiredStatistics } from './selection-strategies-types'
b3432a63
JB
6
7/**
23135a89 8 * Virtual task runtime.
b3432a63
JB
9 */
10type TaskRunTime = {
11 weight: number
12 runTime: number
13}
14
15/**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
22 */
23export class WeightedRoundRobinWorkerChoiceStrategy<
ea7a90d3 24 Worker extends IPoolWorker,
b3432a63
JB
25 Data,
26 Response
27> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
10fcfaf4 28 /** @inheritDoc */
ea7a90d3 29 public readonly requiredStatistics: RequiredStatistics = {
10fcfaf4
JB
30 runTime: true
31 }
32
b3432a63
JB
33 /**
34 * Worker index where the previous task was submitted.
35 */
36 private previousWorkerIndex: number = 0
37 /**
38 * Worker index where the current task will be submitted.
39 */
40 private currentWorkerIndex: number = 0
41 /**
42 * Default worker weight.
43 */
777af0ac 44 private readonly defaultWorkerWeight: number
b3432a63 45 /**
2377984d 46 * Per worker virtual task runtime map.
b3432a63 47 */
ea7a90d3 48 private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
b3432a63
JB
49 Worker,
50 TaskRunTime
51 >()
52
53 /**
23ff945a 54 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
b3432a63
JB
55 *
56 * @param pool The pool instance.
57 */
58 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
59 super(pool)
60 this.defaultWorkerWeight = this.computeWorkerWeight()
2377984d 61 this.initWorkersTaskRunTime()
b3432a63
JB
62 }
63
ea7a90d3 64 /** @inheritDoc */
a6f7f1b4
JB
65 public reset (): boolean {
66 this.previousWorkerIndex = 0
67 this.currentWorkerIndex = 0
ea7a90d3
JB
68 this.workersTaskRunTime.clear()
69 this.initWorkersTaskRunTime()
70 return true
71 }
72
b3432a63
JB
73 /** @inheritDoc */
74 public choose (): Worker {
75 const currentWorker = this.pool.workers[this.currentWorkerIndex]
2377984d
JB
76 if (
77 this.isDynamicPool === true &&
78 this.workersTaskRunTime.has(currentWorker) === false
79 ) {
80 this.initWorkerTaskRunTime(currentWorker)
b3432a63
JB
81 }
82 const workerVirtualTaskRunTime =
83 this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
84 const workerTaskWeight =
2377984d 85 this.workersTaskRunTime.get(currentWorker)?.weight ??
b3432a63
JB
86 this.defaultWorkerWeight
87 if (this.currentWorkerIndex === this.previousWorkerIndex) {
88 const workerTaskRunTime =
2377984d 89 (this.workersTaskRunTime.get(currentWorker)?.runTime ?? 0) +
b3432a63 90 workerVirtualTaskRunTime
2377984d
JB
91 this.setWorkerTaskRunTime(
92 currentWorker,
93 workerTaskWeight,
94 workerTaskRunTime
95 )
b3432a63 96 } else {
2377984d 97 this.setWorkerTaskRunTime(currentWorker, workerTaskWeight, 0)
b3432a63 98 }
7b0d35b8 99 if (workerVirtualTaskRunTime < workerTaskWeight) {
b3432a63
JB
100 this.previousWorkerIndex = this.currentWorkerIndex
101 } else {
102 this.previousWorkerIndex = this.currentWorkerIndex
103 this.currentWorkerIndex =
104 this.pool.workers.length - 1 === this.currentWorkerIndex
105 ? 0
106 : this.currentWorkerIndex + 1
107 }
108 return this.pool.workers[this.currentWorkerIndex]
109 }
110
2377984d
JB
111 private initWorkersTaskRunTime (): void {
112 for (const worker of this.pool.workers) {
113 this.initWorkerTaskRunTime(worker)
114 }
115 }
116
117 private initWorkerTaskRunTime (worker: Worker): void {
118 this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
119 }
120
121 private setWorkerTaskRunTime (
122 worker: Worker,
123 weight: number,
124 runTime: number
125 ): void {
126 this.workersTaskRunTime.set(worker, {
127 weight,
128 runTime
129 })
130 }
131
132 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
133 return this.pool.getWorkerAverageTasksRunTime(worker)
134 }
135
136 private computeWorkerWeight (): number {
b3432a63 137 let cpusCycleTimeWeight = 0
a59e741b 138 for (const cpu of cpus()) {
b3432a63 139 // CPU estimated cycle time
a59e741b
JB
140 const numberOfDigit = cpu.speed.toString().length - 1
141 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigit))
b3432a63
JB
142 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
143 }
7b0d35b8 144 return Math.round(cpusCycleTimeWeight / cpus().length)
b3432a63 145 }
b3432a63 146}