feat: add pool runtime setters
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
fc3e6586 1import { cpus } from 'node:os'
f06e48d8 2import type { IWorker } from '../worker'
b3432a63 3import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
bf90656c
JB
4import type {
5 IWorkerChoiceStrategy,
da309861
JB
6 RequiredStatistics,
7 WorkerChoiceStrategyOptions
bf90656c 8} from './selection-strategies-types'
c4855468 9import type { IPool } from '../pool'
2fc5cae3 10import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
b3432a63
JB
11
12/**
23135a89 13 * Virtual task runtime.
b3432a63 14 */
78cea37e 15interface TaskRunTime {
b3432a63
JB
16 weight: number
17 runTime: number
18}
19
20/**
21 * Selects the next worker with a weighted round robin scheduling algorithm.
22 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
23 *
38e795c1
JB
24 * @typeParam Worker - Type of worker which manages the strategy.
25 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 26 * @typeParam Response - Type of execution response. This can only be serializable data.
b3432a63
JB
27 */
28export class WeightedRoundRobinWorkerChoiceStrategy<
f06e48d8 29 Worker extends IWorker,
b2b1d84e
JB
30 Data = unknown,
31 Response = unknown
bf90656c
JB
32 >
33 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
17393ac8 34 implements IWorkerChoiceStrategy {
afc003b2 35 /** @inheritDoc */
ea7a90d3 36 public readonly requiredStatistics: RequiredStatistics = {
c6bd2650 37 runTime: true,
78099a15
JB
38 avgRunTime: true,
39 medRunTime: false
10fcfaf4
JB
40 }
41
b3432a63 42 /**
f06e48d8 43 * Worker node id where the current task will be submitted.
b3432a63 44 */
f06e48d8 45 private currentWorkerNodeId: number = 0
b3432a63
JB
46 /**
47 * Default worker weight.
48 */
777af0ac 49 private readonly defaultWorkerWeight: number
b3432a63 50 /**
f06e48d8 51 * Workers' virtual task runtime.
b3432a63 52 */
c923ce56
JB
53 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
54 number,
78cea37e 55 TaskRunTime
b3432a63
JB
56 >()
57
2fc5cae3 58 /** @inheritDoc */
da309861 59 public constructor (
c4855468 60 pool: IPool<Worker, Data, Response>,
2fc5cae3 61 opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
da309861
JB
62 ) {
63 super(pool, opts)
a20f0ba5 64 this.checkOptions(this.opts)
b3432a63 65 this.defaultWorkerWeight = this.computeWorkerWeight()
2377984d 66 this.initWorkersTaskRunTime()
b3432a63
JB
67 }
68
afc003b2 69 /** @inheritDoc */
a6f7f1b4 70 public reset (): boolean {
f06e48d8 71 this.currentWorkerNodeId = 0
ea7a90d3
JB
72 this.workersTaskRunTime.clear()
73 this.initWorkersTaskRunTime()
74 return true
75 }
76
afc003b2 77 /** @inheritDoc */
c923ce56 78 public choose (): number {
f06e48d8
JB
79 const chosenWorkerNodeKey = this.currentWorkerNodeId
80 if (
81 this.isDynamicPool &&
82 !this.workersTaskRunTime.has(chosenWorkerNodeKey)
83 ) {
84 this.initWorkerTaskRunTime(chosenWorkerNodeKey)
b3432a63 85 }
d8a610ca 86 const workerTaskRunTime =
f06e48d8 87 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0
b3432a63 88 const workerTaskWeight =
f06e48d8 89 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ??
b3432a63 90 this.defaultWorkerWeight
553ad720 91 if (workerTaskRunTime < workerTaskWeight) {
2377984d 92 this.setWorkerTaskRunTime(
f06e48d8 93 chosenWorkerNodeKey,
2377984d 94 workerTaskWeight,
553ad720 95 workerTaskRunTime +
f06e48d8 96 (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
2377984d 97 )
b3432a63 98 } else {
f06e48d8
JB
99 this.currentWorkerNodeId =
100 this.currentWorkerNodeId === this.pool.workerNodes.length - 1
b3432a63 101 ? 0
f06e48d8
JB
102 : this.currentWorkerNodeId + 1
103 this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0)
b3432a63 104 }
f06e48d8 105 return chosenWorkerNodeKey
b3432a63
JB
106 }
107
afc003b2 108 /** @inheritDoc */
f06e48d8
JB
109 public remove (workerNodeKey: number): boolean {
110 if (this.currentWorkerNodeId === workerNodeKey) {
111 if (this.pool.workerNodes.length === 0) {
112 this.currentWorkerNodeId = 0
78ab2555 113 } else {
f06e48d8
JB
114 this.currentWorkerNodeId =
115 this.currentWorkerNodeId > this.pool.workerNodes.length - 1
116 ? this.pool.workerNodes.length - 1
117 : this.currentWorkerNodeId
78ab2555 118 }
97a2abc3 119 }
f06e48d8 120 const deleted = this.workersTaskRunTime.delete(workerNodeKey)
97a2abc3 121 for (const [key, value] of this.workersTaskRunTime) {
f06e48d8 122 if (key > workerNodeKey) {
97a2abc3
JB
123 this.workersTaskRunTime.set(key - 1, value)
124 }
125 }
f06e48d8 126 return deleted
97a2abc3
JB
127 }
128
2377984d 129 private initWorkersTaskRunTime (): void {
f06e48d8 130 for (const [index] of this.pool.workerNodes.entries()) {
c923ce56 131 this.initWorkerTaskRunTime(index)
2377984d
JB
132 }
133 }
134
f06e48d8
JB
135 private initWorkerTaskRunTime (workerNodeKey: number): void {
136 this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0)
2377984d
JB
137 }
138
139 private setWorkerTaskRunTime (
f06e48d8 140 workerNodeKey: number,
2377984d
JB
141 weight: number,
142 runTime: number
143 ): void {
f06e48d8 144 this.workersTaskRunTime.set(workerNodeKey, {
2377984d
JB
145 weight,
146 runTime
147 })
148 }
149
f06e48d8 150 private getWorkerVirtualTaskRunTime (workerNodeKey: number): number {
da309861
JB
151 return this.requiredStatistics.medRunTime
152 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
153 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
2377984d
JB
154 }
155
156 private computeWorkerWeight (): number {
b3432a63 157 let cpusCycleTimeWeight = 0
a59e741b 158 for (const cpu of cpus()) {
b3432a63 159 // CPU estimated cycle time
d8a610ca
JB
160 const numberOfDigits = cpu.speed.toString().length - 1
161 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
162 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
b3432a63 163 }
7b0d35b8 164 return Math.round(cpusCycleTimeWeight / cpus().length)
b3432a63 165 }
b3432a63 166}