Commit | Line | Data |
---|---|---|
fc3e6586 | 1 | import { cpus } from 'node:os' |
f06e48d8 | 2 | import type { IWorker } from '../worker' |
b3432a63 | 3 | import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' |
bf90656c JB |
4 | import type { |
5 | IWorkerChoiceStrategy, | |
da309861 JB |
6 | RequiredStatistics, |
7 | WorkerChoiceStrategyOptions | |
bf90656c | 8 | } from './selection-strategies-types' |
c4855468 | 9 | import type { IPool } from '../pool' |
2fc5cae3 | 10 | import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' |
b3432a63 JB |
11 | |
12 | /** | |
23135a89 | 13 | * Virtual task runtime. |
b3432a63 | 14 | */ |
78cea37e | 15 | interface TaskRunTime { |
b3432a63 JB |
16 | weight: number |
17 | runTime: number | |
18 | } | |
19 | ||
20 | /** | |
21 | * Selects the next worker with a weighted round robin scheduling algorithm. | |
22 | * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin. | |
23 | * | |
38e795c1 JB |
24 | * @typeParam Worker - Type of worker which manages the strategy. |
25 | * @typeParam Data - Type of data sent to the worker. This can only be serializable data. | |
02706357 | 26 | * @typeParam Response - Type of execution response. This can only be serializable data. |
b3432a63 JB |
27 | */ |
28 | export class WeightedRoundRobinWorkerChoiceStrategy< | |
f06e48d8 | 29 | Worker extends IWorker, |
b2b1d84e JB |
30 | Data = unknown, |
31 | Response = unknown | |
bf90656c JB |
32 | > |
33 | extends AbstractWorkerChoiceStrategy<Worker, Data, Response> | |
17393ac8 | 34 | implements IWorkerChoiceStrategy { |
afc003b2 | 35 | /** @inheritDoc */ |
ea7a90d3 | 36 | public readonly requiredStatistics: RequiredStatistics = { |
c6bd2650 | 37 | runTime: true, |
78099a15 JB |
38 | avgRunTime: true, |
39 | medRunTime: false | |
10fcfaf4 JB |
40 | } |
41 | ||
b3432a63 | 42 | /** |
f06e48d8 | 43 | * Worker node id where the current task will be submitted. |
b3432a63 | 44 | */ |
f06e48d8 | 45 | private currentWorkerNodeId: number = 0 |
b3432a63 JB |
46 | /** |
47 | * Default worker weight. | |
48 | */ | |
777af0ac | 49 | private readonly defaultWorkerWeight: number |
b3432a63 | 50 | /** |
f06e48d8 | 51 | * Workers' virtual task runtime. |
b3432a63 | 52 | */ |
c923ce56 JB |
53 | private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map< |
54 | number, | |
78cea37e | 55 | TaskRunTime |
b3432a63 JB |
56 | >() |
57 | ||
2fc5cae3 | 58 | /** @inheritDoc */ |
da309861 | 59 | public constructor ( |
c4855468 | 60 | pool: IPool<Worker, Data, Response>, |
2fc5cae3 | 61 | opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS |
da309861 JB |
62 | ) { |
63 | super(pool, opts) | |
2fc5cae3 | 64 | this.checkOptions(opts) |
b3432a63 | 65 | this.defaultWorkerWeight = this.computeWorkerWeight() |
2377984d | 66 | this.initWorkersTaskRunTime() |
b3432a63 JB |
67 | } |
68 | ||
afc003b2 | 69 | /** @inheritDoc */ |
a6f7f1b4 | 70 | public reset (): boolean { |
f06e48d8 | 71 | this.currentWorkerNodeId = 0 |
ea7a90d3 JB |
72 | this.workersTaskRunTime.clear() |
73 | this.initWorkersTaskRunTime() | |
74 | return true | |
75 | } | |
76 | ||
afc003b2 | 77 | /** @inheritDoc */ |
c923ce56 | 78 | public choose (): number { |
f06e48d8 JB |
79 | const chosenWorkerNodeKey = this.currentWorkerNodeId |
80 | if ( | |
81 | this.isDynamicPool && | |
82 | !this.workersTaskRunTime.has(chosenWorkerNodeKey) | |
83 | ) { | |
84 | this.initWorkerTaskRunTime(chosenWorkerNodeKey) | |
b3432a63 | 85 | } |
d8a610ca | 86 | const workerTaskRunTime = |
f06e48d8 | 87 | this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0 |
b3432a63 | 88 | const workerTaskWeight = |
f06e48d8 | 89 | this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ?? |
b3432a63 | 90 | this.defaultWorkerWeight |
553ad720 | 91 | if (workerTaskRunTime < workerTaskWeight) { |
2377984d | 92 | this.setWorkerTaskRunTime( |
f06e48d8 | 93 | chosenWorkerNodeKey, |
2377984d | 94 | workerTaskWeight, |
553ad720 | 95 | workerTaskRunTime + |
f06e48d8 | 96 | (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0) |
2377984d | 97 | ) |
b3432a63 | 98 | } else { |
f06e48d8 JB |
99 | this.currentWorkerNodeId = |
100 | this.currentWorkerNodeId === this.pool.workerNodes.length - 1 | |
b3432a63 | 101 | ? 0 |
f06e48d8 JB |
102 | : this.currentWorkerNodeId + 1 |
103 | this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0) | |
b3432a63 | 104 | } |
f06e48d8 | 105 | return chosenWorkerNodeKey |
b3432a63 JB |
106 | } |
107 | ||
afc003b2 | 108 | /** @inheritDoc */ |
f06e48d8 JB |
109 | public remove (workerNodeKey: number): boolean { |
110 | if (this.currentWorkerNodeId === workerNodeKey) { | |
111 | if (this.pool.workerNodes.length === 0) { | |
112 | this.currentWorkerNodeId = 0 | |
78ab2555 | 113 | } else { |
f06e48d8 JB |
114 | this.currentWorkerNodeId = |
115 | this.currentWorkerNodeId > this.pool.workerNodes.length - 1 | |
116 | ? this.pool.workerNodes.length - 1 | |
117 | : this.currentWorkerNodeId | |
78ab2555 | 118 | } |
97a2abc3 | 119 | } |
f06e48d8 | 120 | const deleted = this.workersTaskRunTime.delete(workerNodeKey) |
97a2abc3 | 121 | for (const [key, value] of this.workersTaskRunTime) { |
f06e48d8 | 122 | if (key > workerNodeKey) { |
97a2abc3 JB |
123 | this.workersTaskRunTime.set(key - 1, value) |
124 | } | |
125 | } | |
f06e48d8 | 126 | return deleted |
97a2abc3 JB |
127 | } |
128 | ||
2377984d | 129 | private initWorkersTaskRunTime (): void { |
f06e48d8 | 130 | for (const [index] of this.pool.workerNodes.entries()) { |
c923ce56 | 131 | this.initWorkerTaskRunTime(index) |
2377984d JB |
132 | } |
133 | } | |
134 | ||
f06e48d8 JB |
135 | private initWorkerTaskRunTime (workerNodeKey: number): void { |
136 | this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0) | |
2377984d JB |
137 | } |
138 | ||
139 | private setWorkerTaskRunTime ( | |
f06e48d8 | 140 | workerNodeKey: number, |
2377984d JB |
141 | weight: number, |
142 | runTime: number | |
143 | ): void { | |
f06e48d8 | 144 | this.workersTaskRunTime.set(workerNodeKey, { |
2377984d JB |
145 | weight, |
146 | runTime | |
147 | }) | |
148 | } | |
149 | ||
f06e48d8 | 150 | private getWorkerVirtualTaskRunTime (workerNodeKey: number): number { |
da309861 JB |
151 | return this.requiredStatistics.medRunTime |
152 | ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime | |
153 | : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime | |
2377984d JB |
154 | } |
155 | ||
156 | private computeWorkerWeight (): number { | |
b3432a63 | 157 | let cpusCycleTimeWeight = 0 |
a59e741b | 158 | for (const cpu of cpus()) { |
b3432a63 | 159 | // CPU estimated cycle time |
d8a610ca JB |
160 | const numberOfDigits = cpu.speed.toString().length - 1 |
161 | const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) | |
162 | cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) | |
b3432a63 | 163 | } |
7b0d35b8 | 164 | return Math.round(cpusCycleTimeWeight / cpus().length) |
b3432a63 | 165 | } |
b3432a63 | 166 | } |