Commit | Line | Data |
---|---|---|
feec6e8c JB |
1 | import { cpus } from 'node:os' |
2 | import type { IWorker } from '../worker' | |
3 | import type { IPool } from '../pool' | |
4 | import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' | |
5 | import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' | |
6 | import type { | |
7 | IWorkerChoiceStrategy, | |
8 | RequiredStatistics, | |
9 | WorkerChoiceStrategyOptions | |
10 | } from './selection-strategies-types' | |
11 | ||
12 | /** | |
13 | * Selects the next worker with an interleaved weighted round robin scheduling algorithm. | |
14 | * | |
15 | * @typeParam Worker - Type of worker which manages the strategy. | |
16 | * @typeParam Data - Type of data sent to the worker. This can only be serializable data. | |
17 | * @typeParam Response - Type of execution response. This can only be serializable data. | |
18 | */ | |
19 | export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< | |
20 | Worker extends IWorker, | |
21 | Data = unknown, | |
22 | Response = unknown | |
23 | > | |
24 | extends AbstractWorkerChoiceStrategy<Worker, Data, Response> | |
25 | implements IWorkerChoiceStrategy { | |
26 | /** @inheritDoc */ | |
27 | public readonly requiredStatistics: RequiredStatistics = { | |
28 | runTime: true, | |
29 | avgRunTime: true, | |
30 | medRunTime: false | |
31 | } | |
32 | ||
33 | /** | |
34 | * Worker node id where the current task will be submitted. | |
35 | */ | |
36 | private currentWorkerNodeId: number = 0 | |
37 | /** | |
38 | * Current round id. | |
39 | * This is used to determine the current round weight. | |
40 | */ | |
41 | private currentRoundId: number = 0 | |
42 | /** | |
43 | * Round weights. | |
44 | */ | |
e4854a4e | 45 | private roundWeights: number[] |
feec6e8c JB |
46 | /** |
47 | * Default worker weight. | |
48 | */ | |
49 | private readonly defaultWorkerWeight: number | |
50 | ||
51 | /** @inheritDoc */ | |
52 | public constructor ( | |
53 | pool: IPool<Worker, Data, Response>, | |
54 | opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS | |
55 | ) { | |
56 | super(pool, opts) | |
57 | this.checkOptions(this.opts) | |
58 | this.defaultWorkerWeight = this.computeDefaultWorkerWeight() | |
59 | this.roundWeights = this.getRoundWeights() | |
60 | } | |
61 | ||
62 | /** @inheritDoc */ | |
63 | public reset (): boolean { | |
64 | this.currentWorkerNodeId = 0 | |
65 | this.currentRoundId = 0 | |
66 | return true | |
67 | } | |
68 | ||
69 | /** @inheritDoc */ | |
70 | public update (): boolean { | |
71 | return true | |
72 | } | |
73 | ||
74 | /** @inheritDoc */ | |
75 | public choose (): number { | |
76 | let chosenWorkerNodeKey: number | |
77 | const workerWeight = | |
78 | this.opts.weights?.[this.currentWorkerNodeId] ?? this.defaultWorkerWeight | |
79 | if (workerWeight >= this.roundWeights[this.currentRoundId]) { | |
80 | chosenWorkerNodeKey = this.currentWorkerNodeId | |
81 | this.currentWorkerNodeId = | |
82 | this.currentWorkerNodeId === this.pool.workerNodes.length - 1 | |
83 | ? 0 | |
84 | : this.currentWorkerNodeId + 1 | |
85 | if (this.currentWorkerNodeId === this.pool.workerNodes.length - 1) { | |
86 | this.currentRoundId = | |
87 | this.currentRoundId === this.roundWeights.length - 1 | |
88 | ? 0 | |
89 | : this.currentRoundId + 1 | |
90 | } | |
91 | } else { | |
92 | let roundId: number | undefined | |
93 | let workerNodeId: number | undefined | |
94 | for ( | |
95 | let round = this.currentRoundId; | |
96 | round < this.roundWeights.length; | |
97 | round++ | |
98 | ) { | |
99 | for ( | |
100 | let workerNodeKey = this.currentWorkerNodeId + 1; | |
101 | workerNodeKey < this.pool.workerNodes.length; | |
102 | workerNodeKey++ | |
103 | ) { | |
104 | const workerWeight = | |
105 | this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight | |
106 | if (workerWeight >= this.roundWeights[round]) { | |
107 | roundId = round | |
108 | workerNodeId = workerNodeKey | |
109 | break | |
110 | } | |
111 | } | |
112 | } | |
113 | this.currentRoundId = roundId ?? 0 | |
114 | this.currentWorkerNodeId = workerNodeId ?? 0 | |
115 | chosenWorkerNodeKey = this.currentWorkerNodeId | |
116 | } | |
117 | return chosenWorkerNodeKey | |
118 | } | |
119 | ||
120 | /** @inheritDoc */ | |
121 | public remove (workerNodeKey: number): boolean { | |
122 | if (this.currentWorkerNodeId === workerNodeKey) { | |
123 | if (this.pool.workerNodes.length === 0) { | |
124 | this.currentWorkerNodeId = 0 | |
125 | } else { | |
126 | this.currentWorkerNodeId = | |
127 | this.currentWorkerNodeId > this.pool.workerNodes.length - 1 | |
128 | ? this.pool.workerNodes.length - 1 | |
129 | : this.currentWorkerNodeId | |
130 | } | |
131 | } | |
132 | return true | |
133 | } | |
134 | ||
e4854a4e JB |
135 | /** @inheritDoc */ |
136 | public setOptions (opts: WorkerChoiceStrategyOptions): void { | |
137 | super.setOptions(opts) | |
138 | this.roundWeights = this.getRoundWeights() | |
139 | } | |
140 | ||
feec6e8c JB |
141 | private computeDefaultWorkerWeight (): number { |
142 | let cpusCycleTimeWeight = 0 | |
143 | for (const cpu of cpus()) { | |
144 | // CPU estimated cycle time | |
145 | const numberOfDigits = cpu.speed.toString().length - 1 | |
146 | const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) | |
147 | cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) | |
148 | } | |
149 | return Math.round(cpusCycleTimeWeight / cpus().length) | |
150 | } | |
151 | ||
152 | private getRoundWeights (): number[] { | |
153 | if (this.opts.weights == null) { | |
154 | return [this.defaultWorkerWeight] | |
155 | } | |
156 | return [ | |
157 | ...new Set( | |
158 | Object.values(this.opts.weights) | |
159 | .slice() | |
160 | .sort((a, b) => a - b) | |
161 | ) | |
162 | ] | |
163 | } | |
164 | } |