Commit | Line | Data |
---|---|---|
fc3e6586 | 1 | import { cpus } from 'node:os' |
f06e48d8 | 2 | import type { IWorker } from '../worker' |
b3432a63 | 3 | import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' |
bf90656c JB |
4 | import type { |
5 | IWorkerChoiceStrategy, | |
da309861 JB |
6 | RequiredStatistics, |
7 | WorkerChoiceStrategyOptions | |
bf90656c | 8 | } from './selection-strategies-types' |
c4855468 | 9 | import type { IPool } from '../pool' |
b3432a63 JB |
10 | |
11 | /** | |
23135a89 | 12 | * Virtual task runtime. |
b3432a63 | 13 | */ |
78cea37e | 14 | interface TaskRunTime { |
b3432a63 JB |
15 | weight: number |
16 | runTime: number | |
17 | } | |
18 | ||
19 | /** | |
20 | * Selects the next worker with a weighted round robin scheduling algorithm. | |
21 | * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin. | |
22 | * | |
38e795c1 JB |
23 | * @typeParam Worker - Type of worker which manages the strategy. |
24 | * @typeParam Data - Type of data sent to the worker. This can only be serializable data. | |
25 | * @typeParam Response - Type of response of execution. This can only be serializable data. | |
b3432a63 JB |
26 | */ |
27 | export class WeightedRoundRobinWorkerChoiceStrategy< | |
f06e48d8 | 28 | Worker extends IWorker, |
b2b1d84e JB |
29 | Data = unknown, |
30 | Response = unknown | |
bf90656c JB |
31 | > |
32 | extends AbstractWorkerChoiceStrategy<Worker, Data, Response> | |
17393ac8 | 33 | implements IWorkerChoiceStrategy { |
afc003b2 | 34 | /** @inheritDoc */ |
ea7a90d3 | 35 | public readonly requiredStatistics: RequiredStatistics = { |
c6bd2650 | 36 | runTime: true, |
78099a15 JB |
37 | avgRunTime: true, |
38 | medRunTime: false | |
10fcfaf4 JB |
39 | } |
40 | ||
b3432a63 | 41 | /** |
f06e48d8 | 42 | * Worker node id where the current task will be submitted. |
b3432a63 | 43 | */ |
f06e48d8 | 44 | private currentWorkerNodeId: number = 0 |
b3432a63 JB |
45 | /** |
46 | * Default worker weight. | |
47 | */ | |
777af0ac | 48 | private readonly defaultWorkerWeight: number |
b3432a63 | 49 | /** |
f06e48d8 | 50 | * Workers' virtual task runtime. |
b3432a63 | 51 | */ |
c923ce56 JB |
52 | private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map< |
53 | number, | |
78cea37e | 54 | TaskRunTime |
b3432a63 JB |
55 | >() |
56 | ||
57 | /** | |
23ff945a | 58 | * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm. |
b3432a63 | 59 | * |
38e795c1 | 60 | * @param pool - The pool instance. |
da309861 | 61 | * @param opts - The worker choice strategy options. |
b3432a63 | 62 | */ |
da309861 | 63 | public constructor ( |
c4855468 | 64 | pool: IPool<Worker, Data, Response>, |
da309861 JB |
65 | opts?: WorkerChoiceStrategyOptions |
66 | ) { | |
67 | super(pool, opts) | |
b3432a63 | 68 | this.defaultWorkerWeight = this.computeWorkerWeight() |
2377984d | 69 | this.initWorkersTaskRunTime() |
b3432a63 JB |
70 | } |
71 | ||
afc003b2 | 72 | /** @inheritDoc */ |
a6f7f1b4 | 73 | public reset (): boolean { |
f06e48d8 | 74 | this.currentWorkerNodeId = 0 |
ea7a90d3 JB |
75 | this.workersTaskRunTime.clear() |
76 | this.initWorkersTaskRunTime() | |
77 | return true | |
78 | } | |
79 | ||
afc003b2 | 80 | /** @inheritDoc */ |
c923ce56 | 81 | public choose (): number { |
f06e48d8 JB |
82 | const chosenWorkerNodeKey = this.currentWorkerNodeId |
83 | if ( | |
84 | this.isDynamicPool && | |
85 | !this.workersTaskRunTime.has(chosenWorkerNodeKey) | |
86 | ) { | |
87 | this.initWorkerTaskRunTime(chosenWorkerNodeKey) | |
b3432a63 | 88 | } |
d8a610ca | 89 | const workerTaskRunTime = |
f06e48d8 | 90 | this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0 |
b3432a63 | 91 | const workerTaskWeight = |
f06e48d8 | 92 | this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ?? |
b3432a63 | 93 | this.defaultWorkerWeight |
553ad720 | 94 | if (workerTaskRunTime < workerTaskWeight) { |
2377984d | 95 | this.setWorkerTaskRunTime( |
f06e48d8 | 96 | chosenWorkerNodeKey, |
2377984d | 97 | workerTaskWeight, |
553ad720 | 98 | workerTaskRunTime + |
f06e48d8 | 99 | (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0) |
2377984d | 100 | ) |
b3432a63 | 101 | } else { |
f06e48d8 JB |
102 | this.currentWorkerNodeId = |
103 | this.currentWorkerNodeId === this.pool.workerNodes.length - 1 | |
b3432a63 | 104 | ? 0 |
f06e48d8 JB |
105 | : this.currentWorkerNodeId + 1 |
106 | this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0) | |
b3432a63 | 107 | } |
f06e48d8 | 108 | return chosenWorkerNodeKey |
b3432a63 JB |
109 | } |
110 | ||
afc003b2 | 111 | /** @inheritDoc */ |
f06e48d8 JB |
112 | public remove (workerNodeKey: number): boolean { |
113 | if (this.currentWorkerNodeId === workerNodeKey) { | |
114 | if (this.pool.workerNodes.length === 0) { | |
115 | this.currentWorkerNodeId = 0 | |
78ab2555 | 116 | } else { |
f06e48d8 JB |
117 | this.currentWorkerNodeId = |
118 | this.currentWorkerNodeId > this.pool.workerNodes.length - 1 | |
119 | ? this.pool.workerNodes.length - 1 | |
120 | : this.currentWorkerNodeId | |
78ab2555 | 121 | } |
97a2abc3 | 122 | } |
f06e48d8 | 123 | const deleted = this.workersTaskRunTime.delete(workerNodeKey) |
97a2abc3 | 124 | for (const [key, value] of this.workersTaskRunTime) { |
f06e48d8 | 125 | if (key > workerNodeKey) { |
97a2abc3 JB |
126 | this.workersTaskRunTime.set(key - 1, value) |
127 | } | |
128 | } | |
f06e48d8 | 129 | return deleted |
97a2abc3 JB |
130 | } |
131 | ||
2377984d | 132 | private initWorkersTaskRunTime (): void { |
f06e48d8 | 133 | for (const [index] of this.pool.workerNodes.entries()) { |
c923ce56 | 134 | this.initWorkerTaskRunTime(index) |
2377984d JB |
135 | } |
136 | } | |
137 | ||
f06e48d8 JB |
138 | private initWorkerTaskRunTime (workerNodeKey: number): void { |
139 | this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0) | |
2377984d JB |
140 | } |
141 | ||
142 | private setWorkerTaskRunTime ( | |
f06e48d8 | 143 | workerNodeKey: number, |
2377984d JB |
144 | weight: number, |
145 | runTime: number | |
146 | ): void { | |
f06e48d8 | 147 | this.workersTaskRunTime.set(workerNodeKey, { |
2377984d JB |
148 | weight, |
149 | runTime | |
150 | }) | |
151 | } | |
152 | ||
f06e48d8 | 153 | private getWorkerVirtualTaskRunTime (workerNodeKey: number): number { |
da309861 JB |
154 | return this.requiredStatistics.medRunTime |
155 | ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime | |
156 | : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime | |
2377984d JB |
157 | } |
158 | ||
159 | private computeWorkerWeight (): number { | |
b3432a63 | 160 | let cpusCycleTimeWeight = 0 |
a59e741b | 161 | for (const cpu of cpus()) { |
b3432a63 | 162 | // CPU estimated cycle time |
d8a610ca JB |
163 | const numberOfDigits = cpu.speed.toString().length - 1 |
164 | const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) | |
165 | cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) | |
b3432a63 | 166 | } |
7b0d35b8 | 167 | return Math.round(cpusCycleTimeWeight / cpus().length) |
b3432a63 | 168 | } |
b3432a63 | 169 | } |