feat: add median task run time statistic
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
fc3e6586 1import { cpus } from 'node:os'
b3432a63 2import type { IPoolInternal } from '../pool-internal'
ea7a90d3 3import type { IPoolWorker } from '../pool-worker'
b3432a63 4import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
bf90656c
JB
5import type {
6 IWorkerChoiceStrategy,
7 RequiredStatistics
8} from './selection-strategies-types'
b3432a63
JB
9
10/**
23135a89 11 * Virtual task runtime.
b3432a63 12 */
78cea37e 13interface TaskRunTime {
b3432a63
JB
14 weight: number
15 runTime: number
16}
17
18/**
19 * Selects the next worker with a weighted round robin scheduling algorithm.
20 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
21 *
38e795c1
JB
22 * @typeParam Worker - Type of worker which manages the strategy.
23 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
24 * @typeParam Response - Type of response of execution. This can only be serializable data.
b3432a63
JB
25 */
26export class WeightedRoundRobinWorkerChoiceStrategy<
bf90656c 27 Worker extends IPoolWorker,
b2b1d84e
JB
28 Data = unknown,
29 Response = unknown
bf90656c
JB
30 >
31 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
17393ac8 32 implements IWorkerChoiceStrategy {
afc003b2 33 /** @inheritDoc */
ea7a90d3 34 public readonly requiredStatistics: RequiredStatistics = {
c6bd2650 35 runTime: true,
78099a15
JB
36 avgRunTime: true,
37 medRunTime: false
10fcfaf4
JB
38 }
39
b3432a63 40 /**
ffcbbad8 41 * Worker id where the current task will be submitted.
b3432a63 42 */
ffcbbad8 43 private currentWorkerId: number = 0
b3432a63
JB
44 /**
45 * Default worker weight.
46 */
777af0ac 47 private readonly defaultWorkerWeight: number
b3432a63 48 /**
2377984d 49 * Per worker virtual task runtime map.
b3432a63 50 */
c923ce56
JB
51 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
52 number,
78cea37e 53 TaskRunTime
b3432a63
JB
54 >()
55
56 /**
23ff945a 57 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
b3432a63 58 *
38e795c1 59 * @param pool - The pool instance.
b3432a63
JB
60 */
61 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
62 super(pool)
63 this.defaultWorkerWeight = this.computeWorkerWeight()
2377984d 64 this.initWorkersTaskRunTime()
b3432a63
JB
65 }
66
afc003b2 67 /** @inheritDoc */
a6f7f1b4 68 public reset (): boolean {
ffcbbad8 69 this.currentWorkerId = 0
ea7a90d3
JB
70 this.workersTaskRunTime.clear()
71 this.initWorkersTaskRunTime()
72 return true
73 }
74
afc003b2 75 /** @inheritDoc */
c923ce56
JB
76 public choose (): number {
77 const chosenWorkerKey = this.currentWorkerId
78 if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
79 this.initWorkerTaskRunTime(chosenWorkerKey)
b3432a63 80 }
d8a610ca 81 const workerTaskRunTime =
c923ce56 82 this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
b3432a63 83 const workerTaskWeight =
c923ce56 84 this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
b3432a63 85 this.defaultWorkerWeight
553ad720 86 if (workerTaskRunTime < workerTaskWeight) {
2377984d 87 this.setWorkerTaskRunTime(
c923ce56 88 chosenWorkerKey,
2377984d 89 workerTaskWeight,
553ad720 90 workerTaskRunTime +
c923ce56 91 (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
2377984d 92 )
b3432a63 93 } else {
ffcbbad8 94 this.currentWorkerId =
e65c6cd9 95 this.currentWorkerId === this.pool.workers.length - 1
b3432a63 96 ? 0
ffcbbad8 97 : this.currentWorkerId + 1
c923ce56 98 this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
b3432a63 99 }
c923ce56 100 return chosenWorkerKey
b3432a63
JB
101 }
102
afc003b2 103 /** @inheritDoc */
97a2abc3
JB
104 public remove (workerKey: number): boolean {
105 if (this.currentWorkerId === workerKey) {
78ab2555
JB
106 if (this.pool.workers.length === 0) {
107 this.currentWorkerId = 0
108 } else {
109 this.currentWorkerId =
110 this.currentWorkerId > this.pool.workers.length - 1
111 ? this.pool.workers.length - 1
112 : this.currentWorkerId
113 }
97a2abc3
JB
114 }
115 const workerDeleted = this.workersTaskRunTime.delete(workerKey)
116 for (const [key, value] of this.workersTaskRunTime) {
117 if (key > workerKey) {
118 this.workersTaskRunTime.set(key - 1, value)
119 }
120 }
121 return workerDeleted
122 }
123
2377984d 124 private initWorkersTaskRunTime (): void {
c923ce56
JB
125 for (const [index] of this.pool.workers.entries()) {
126 this.initWorkerTaskRunTime(index)
2377984d
JB
127 }
128 }
129
c923ce56
JB
130 private initWorkerTaskRunTime (workerKey: number): void {
131 this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
2377984d
JB
132 }
133
134 private setWorkerTaskRunTime (
c923ce56 135 workerKey: number,
2377984d
JB
136 weight: number,
137 runTime: number
138 ): void {
c923ce56 139 this.workersTaskRunTime.set(workerKey, {
2377984d
JB
140 weight,
141 runTime
142 })
143 }
144
c923ce56
JB
145 private getWorkerVirtualTaskRunTime (workerKey: number): number {
146 return this.pool.workers[workerKey].tasksUsage.avgRunTime
2377984d
JB
147 }
148
149 private computeWorkerWeight (): number {
b3432a63 150 let cpusCycleTimeWeight = 0
a59e741b 151 for (const cpu of cpus()) {
b3432a63 152 // CPU estimated cycle time
d8a610ca
JB
153 const numberOfDigits = cpu.speed.toString().length - 1
154 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
155 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
b3432a63 156 }
7b0d35b8 157 return Math.round(cpusCycleTimeWeight / cpus().length)
b3432a63 158 }
b3432a63 159}