refactor: explicity extends Task for MessageValue type
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
fc3e6586 1import { cpus } from 'node:os'
f06e48d8 2import type { IWorker } from '../worker'
b3432a63 3import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
bf90656c
JB
4import type {
5 IWorkerChoiceStrategy,
da309861
JB
6 RequiredStatistics,
7 WorkerChoiceStrategyOptions
bf90656c 8} from './selection-strategies-types'
c4855468 9import type { IPool } from '../pool'
b3432a63
JB
10
11/**
23135a89 12 * Virtual task runtime.
b3432a63 13 */
78cea37e 14interface TaskRunTime {
b3432a63
JB
15 weight: number
16 runTime: number
17}
18
19/**
20 * Selects the next worker with a weighted round robin scheduling algorithm.
21 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
22 *
38e795c1
JB
23 * @typeParam Worker - Type of worker which manages the strategy.
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 25 * @typeParam Response - Type of execution response. This can only be serializable data.
b3432a63
JB
26 */
27export class WeightedRoundRobinWorkerChoiceStrategy<
f06e48d8 28 Worker extends IWorker,
b2b1d84e
JB
29 Data = unknown,
30 Response = unknown
bf90656c
JB
31 >
32 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
17393ac8 33 implements IWorkerChoiceStrategy {
afc003b2 34 /** @inheritDoc */
ea7a90d3 35 public readonly requiredStatistics: RequiredStatistics = {
c6bd2650 36 runTime: true,
78099a15
JB
37 avgRunTime: true,
38 medRunTime: false
10fcfaf4
JB
39 }
40
b3432a63 41 /**
f06e48d8 42 * Worker node id where the current task will be submitted.
b3432a63 43 */
f06e48d8 44 private currentWorkerNodeId: number = 0
b3432a63
JB
45 /**
46 * Default worker weight.
47 */
777af0ac 48 private readonly defaultWorkerWeight: number
b3432a63 49 /**
f06e48d8 50 * Workers' virtual task runtime.
b3432a63 51 */
c923ce56
JB
52 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
53 number,
78cea37e 54 TaskRunTime
b3432a63
JB
55 >()
56
57 /**
23ff945a 58 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
b3432a63 59 *
38e795c1 60 * @param pool - The pool instance.
da309861 61 * @param opts - The worker choice strategy options.
b3432a63 62 */
da309861 63 public constructor (
c4855468 64 pool: IPool<Worker, Data, Response>,
da309861
JB
65 opts?: WorkerChoiceStrategyOptions
66 ) {
67 super(pool, opts)
b3432a63 68 this.defaultWorkerWeight = this.computeWorkerWeight()
2377984d 69 this.initWorkersTaskRunTime()
b3432a63
JB
70 }
71
afc003b2 72 /** @inheritDoc */
a6f7f1b4 73 public reset (): boolean {
f06e48d8 74 this.currentWorkerNodeId = 0
ea7a90d3
JB
75 this.workersTaskRunTime.clear()
76 this.initWorkersTaskRunTime()
77 return true
78 }
79
afc003b2 80 /** @inheritDoc */
c923ce56 81 public choose (): number {
f06e48d8
JB
82 const chosenWorkerNodeKey = this.currentWorkerNodeId
83 if (
84 this.isDynamicPool &&
85 !this.workersTaskRunTime.has(chosenWorkerNodeKey)
86 ) {
87 this.initWorkerTaskRunTime(chosenWorkerNodeKey)
b3432a63 88 }
d8a610ca 89 const workerTaskRunTime =
f06e48d8 90 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0
b3432a63 91 const workerTaskWeight =
f06e48d8 92 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ??
b3432a63 93 this.defaultWorkerWeight
553ad720 94 if (workerTaskRunTime < workerTaskWeight) {
2377984d 95 this.setWorkerTaskRunTime(
f06e48d8 96 chosenWorkerNodeKey,
2377984d 97 workerTaskWeight,
553ad720 98 workerTaskRunTime +
f06e48d8 99 (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
2377984d 100 )
b3432a63 101 } else {
f06e48d8
JB
102 this.currentWorkerNodeId =
103 this.currentWorkerNodeId === this.pool.workerNodes.length - 1
b3432a63 104 ? 0
f06e48d8
JB
105 : this.currentWorkerNodeId + 1
106 this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0)
b3432a63 107 }
f06e48d8 108 return chosenWorkerNodeKey
b3432a63
JB
109 }
110
afc003b2 111 /** @inheritDoc */
f06e48d8
JB
112 public remove (workerNodeKey: number): boolean {
113 if (this.currentWorkerNodeId === workerNodeKey) {
114 if (this.pool.workerNodes.length === 0) {
115 this.currentWorkerNodeId = 0
78ab2555 116 } else {
f06e48d8
JB
117 this.currentWorkerNodeId =
118 this.currentWorkerNodeId > this.pool.workerNodes.length - 1
119 ? this.pool.workerNodes.length - 1
120 : this.currentWorkerNodeId
78ab2555 121 }
97a2abc3 122 }
f06e48d8 123 const deleted = this.workersTaskRunTime.delete(workerNodeKey)
97a2abc3 124 for (const [key, value] of this.workersTaskRunTime) {
f06e48d8 125 if (key > workerNodeKey) {
97a2abc3
JB
126 this.workersTaskRunTime.set(key - 1, value)
127 }
128 }
f06e48d8 129 return deleted
97a2abc3
JB
130 }
131
2377984d 132 private initWorkersTaskRunTime (): void {
f06e48d8 133 for (const [index] of this.pool.workerNodes.entries()) {
c923ce56 134 this.initWorkerTaskRunTime(index)
2377984d
JB
135 }
136 }
137
f06e48d8
JB
138 private initWorkerTaskRunTime (workerNodeKey: number): void {
139 this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0)
2377984d
JB
140 }
141
142 private setWorkerTaskRunTime (
f06e48d8 143 workerNodeKey: number,
2377984d
JB
144 weight: number,
145 runTime: number
146 ): void {
f06e48d8 147 this.workersTaskRunTime.set(workerNodeKey, {
2377984d
JB
148 weight,
149 runTime
150 })
151 }
152
f06e48d8 153 private getWorkerVirtualTaskRunTime (workerNodeKey: number): number {
da309861
JB
154 return this.requiredStatistics.medRunTime
155 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
156 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
2377984d
JB
157 }
158
159 private computeWorkerWeight (): number {
b3432a63 160 let cpusCycleTimeWeight = 0
a59e741b 161 for (const cpu of cpus()) {
b3432a63 162 // CPU estimated cycle time
d8a610ca
JB
163 const numberOfDigits = cpu.speed.toString().length - 1
164 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
165 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
b3432a63 166 }
7b0d35b8 167 return Math.round(cpusCycleTimeWeight / cpus().length)
b3432a63 168 }
b3432a63 169}