fix: ensure worker removal impact is propated to worker choice strategy
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
fc3e6586 1import { cpus } from 'node:os'
b3432a63 2import type { IPoolInternal } from '../pool-internal'
ea7a90d3 3import type { IPoolWorker } from '../pool-worker'
b3432a63 4import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
10fcfaf4 5import type { RequiredStatistics } from './selection-strategies-types'
b3432a63
JB
6
7/**
23135a89 8 * Virtual task runtime.
b3432a63 9 */
78cea37e 10interface TaskRunTime {
b3432a63
JB
11 weight: number
12 runTime: number
13}
14
15/**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
38e795c1
JB
19 * @typeParam Worker - Type of worker which manages the strategy.
20 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
21 * @typeParam Response - Type of response of execution. This can only be serializable data.
b3432a63
JB
22 */
23export class WeightedRoundRobinWorkerChoiceStrategy<
ea7a90d3 24 Worker extends IPoolWorker,
b3432a63
JB
25 Data,
26 Response
27> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
38e795c1 28 /** {@inheritDoc} */
ea7a90d3 29 public readonly requiredStatistics: RequiredStatistics = {
10fcfaf4
JB
30 runTime: true
31 }
32
b3432a63 33 /**
ffcbbad8 34 * Worker id where the current task will be submitted.
b3432a63 35 */
ffcbbad8 36 private currentWorkerId: number = 0
b3432a63
JB
37 /**
38 * Default worker weight.
39 */
777af0ac 40 private readonly defaultWorkerWeight: number
b3432a63 41 /**
2377984d 42 * Per worker virtual task runtime map.
b3432a63 43 */
c923ce56
JB
44 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
45 number,
78cea37e 46 TaskRunTime
b3432a63
JB
47 >()
48
49 /**
23ff945a 50 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
b3432a63 51 *
38e795c1 52 * @param pool - The pool instance.
b3432a63
JB
53 */
54 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
55 super(pool)
56 this.defaultWorkerWeight = this.computeWorkerWeight()
2377984d 57 this.initWorkersTaskRunTime()
b3432a63
JB
58 }
59
38e795c1 60 /** {@inheritDoc} */
a6f7f1b4 61 public reset (): boolean {
ffcbbad8 62 this.currentWorkerId = 0
ea7a90d3
JB
63 this.workersTaskRunTime.clear()
64 this.initWorkersTaskRunTime()
65 return true
66 }
67
38e795c1 68 /** {@inheritDoc} */
c923ce56
JB
69 public choose (): number {
70 const chosenWorkerKey = this.currentWorkerId
71 if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
72 this.initWorkerTaskRunTime(chosenWorkerKey)
b3432a63 73 }
d8a610ca 74 const workerTaskRunTime =
c923ce56 75 this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
b3432a63 76 const workerTaskWeight =
c923ce56 77 this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
b3432a63 78 this.defaultWorkerWeight
553ad720 79 if (workerTaskRunTime < workerTaskWeight) {
2377984d 80 this.setWorkerTaskRunTime(
c923ce56 81 chosenWorkerKey,
2377984d 82 workerTaskWeight,
553ad720 83 workerTaskRunTime +
c923ce56 84 (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
2377984d 85 )
b3432a63 86 } else {
ffcbbad8 87 this.currentWorkerId =
e65c6cd9 88 this.currentWorkerId === this.pool.workers.length - 1
b3432a63 89 ? 0
ffcbbad8 90 : this.currentWorkerId + 1
c923ce56 91 this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
b3432a63 92 }
c923ce56 93 return chosenWorkerKey
b3432a63
JB
94 }
95
97a2abc3
JB
96 /** {@inheritDoc} */
97 public remove (workerKey: number): boolean {
98 if (this.currentWorkerId === workerKey) {
99 this.currentWorkerId =
100 this.currentWorkerId > this.pool.workers.length - 1
101 ? this.pool.workers.length - 1
102 : this.currentWorkerId
103 }
104 const workerDeleted = this.workersTaskRunTime.delete(workerKey)
105 for (const [key, value] of this.workersTaskRunTime) {
106 if (key > workerKey) {
107 this.workersTaskRunTime.set(key - 1, value)
108 }
109 }
110 return workerDeleted
111 }
112
2377984d 113 private initWorkersTaskRunTime (): void {
c923ce56
JB
114 for (const [index] of this.pool.workers.entries()) {
115 this.initWorkerTaskRunTime(index)
2377984d
JB
116 }
117 }
118
c923ce56
JB
119 private initWorkerTaskRunTime (workerKey: number): void {
120 this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
2377984d
JB
121 }
122
123 private setWorkerTaskRunTime (
c923ce56 124 workerKey: number,
2377984d
JB
125 weight: number,
126 runTime: number
127 ): void {
c923ce56 128 this.workersTaskRunTime.set(workerKey, {
2377984d
JB
129 weight,
130 runTime
131 })
132 }
133
c923ce56
JB
134 private getWorkerVirtualTaskRunTime (workerKey: number): number {
135 return this.pool.workers[workerKey].tasksUsage.avgRunTime
2377984d
JB
136 }
137
138 private computeWorkerWeight (): number {
b3432a63 139 let cpusCycleTimeWeight = 0
a59e741b 140 for (const cpu of cpus()) {
b3432a63 141 // CPU estimated cycle time
d8a610ca
JB
142 const numberOfDigits = cpu.speed.toString().length - 1
143 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
144 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
b3432a63 145 }
7b0d35b8 146 return Math.round(cpusCycleTimeWeight / cpus().length)
b3432a63 147 }
b3432a63 148}