Commit | Line | Data |
---|---|---|
feec6e8c JB |
1 | import type { IWorker } from '../worker' |
2 | import type { IPool } from '../pool' | |
3 | import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' | |
4 | import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' | |
5 | import type { | |
6 | IWorkerChoiceStrategy, | |
6c6afb84 | 7 | StrategyPolicy, |
feec6e8c JB |
8 | WorkerChoiceStrategyOptions |
9 | } from './selection-strategies-types' | |
10 | ||
11 | /** | |
12 | * Selects the next worker with an interleaved weighted round robin scheduling algorithm. | |
13 | * | |
14 | * @typeParam Worker - Type of worker which manages the strategy. | |
15 | * @typeParam Data - Type of data sent to the worker. This can only be serializable data. | |
16 | * @typeParam Response - Type of execution response. This can only be serializable data. | |
17 | */ | |
18 | export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< | |
19 | Worker extends IWorker, | |
20 | Data = unknown, | |
21 | Response = unknown | |
22 | > | |
23 | extends AbstractWorkerChoiceStrategy<Worker, Data, Response> | |
24 | implements IWorkerChoiceStrategy { | |
6c6afb84 JB |
25 | /** @inheritDoc */ |
26 | public readonly strategyPolicy: StrategyPolicy = { | |
27 | useDynamicWorker: true | |
28 | } | |
29 | ||
feec6e8c | 30 | /** |
d33be430 | 31 | * Round id. |
feec6e8c JB |
32 | * This is used to determine the current round weight. |
33 | */ | |
d33be430 | 34 | private roundId: number = 0 |
feec6e8c JB |
35 | /** |
36 | * Round weights. | |
37 | */ | |
e4854a4e | 38 | private roundWeights: number[] |
feec6e8c JB |
39 | /** |
40 | * Default worker weight. | |
41 | */ | |
42 | private readonly defaultWorkerWeight: number | |
43 | ||
44 | /** @inheritDoc */ | |
45 | public constructor ( | |
46 | pool: IPool<Worker, Data, Response>, | |
47 | opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS | |
48 | ) { | |
49 | super(pool, opts) | |
932fc8be | 50 | this.setTaskStatisticsRequirements(this.opts) |
feec6e8c JB |
51 | this.defaultWorkerWeight = this.computeDefaultWorkerWeight() |
52 | this.roundWeights = this.getRoundWeights() | |
53 | } | |
54 | ||
55 | /** @inheritDoc */ | |
56 | public reset (): boolean { | |
d33be430 JB |
57 | this.nextWorkerNodeId = 0 |
58 | this.roundId = 0 | |
feec6e8c JB |
59 | return true |
60 | } | |
61 | ||
62 | /** @inheritDoc */ | |
63 | public update (): boolean { | |
64 | return true | |
65 | } | |
66 | ||
67 | /** @inheritDoc */ | |
68 | public choose (): number { | |
297f3bbe JB |
69 | let roundId: number | undefined |
70 | let workerNodeId: number | undefined | |
71 | for ( | |
d33be430 | 72 | let roundIndex = this.roundId; |
d3127e84 JB |
73 | roundIndex < this.roundWeights.length; |
74 | roundIndex++ | |
297f3bbe | 75 | ) { |
feec6e8c | 76 | for ( |
d33be430 | 77 | let workerNodeKey = this.nextWorkerNodeId; |
297f3bbe JB |
78 | workerNodeKey < this.pool.workerNodes.length; |
79 | workerNodeKey++ | |
feec6e8c | 80 | ) { |
297f3bbe JB |
81 | const workerWeight = |
82 | this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight | |
d3127e84 JB |
83 | if (workerWeight >= this.roundWeights[roundIndex]) { |
84 | roundId = roundIndex | |
297f3bbe JB |
85 | workerNodeId = workerNodeKey |
86 | break | |
feec6e8c JB |
87 | } |
88 | } | |
297f3bbe | 89 | } |
d33be430 JB |
90 | this.roundId = roundId ?? 0 |
91 | this.nextWorkerNodeId = workerNodeId ?? 0 | |
92 | const chosenWorkerNodeKey = this.nextWorkerNodeId | |
93 | if (this.nextWorkerNodeId === this.pool.workerNodes.length - 1) { | |
94 | this.nextWorkerNodeId = 0 | |
95 | this.roundId = | |
96 | this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1 | |
297f3bbe | 97 | } else { |
d33be430 | 98 | this.nextWorkerNodeId = this.nextWorkerNodeId + 1 |
feec6e8c JB |
99 | } |
100 | return chosenWorkerNodeKey | |
101 | } | |
102 | ||
103 | /** @inheritDoc */ | |
104 | public remove (workerNodeKey: number): boolean { | |
d33be430 | 105 | if (this.nextWorkerNodeId === workerNodeKey) { |
feec6e8c | 106 | if (this.pool.workerNodes.length === 0) { |
d33be430 JB |
107 | this.nextWorkerNodeId = 0 |
108 | } else if (this.nextWorkerNodeId > this.pool.workerNodes.length - 1) { | |
109 | this.nextWorkerNodeId = this.pool.workerNodes.length - 1 | |
110 | this.roundId = | |
111 | this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1 | |
feec6e8c JB |
112 | } |
113 | } | |
114 | return true | |
115 | } | |
116 | ||
e4854a4e JB |
117 | /** @inheritDoc */ |
118 | public setOptions (opts: WorkerChoiceStrategyOptions): void { | |
119 | super.setOptions(opts) | |
120 | this.roundWeights = this.getRoundWeights() | |
121 | } | |
122 | ||
feec6e8c JB |
123 | private getRoundWeights (): number[] { |
124 | if (this.opts.weights == null) { | |
125 | return [this.defaultWorkerWeight] | |
126 | } | |
127 | return [ | |
128 | ...new Set( | |
129 | Object.values(this.opts.weights) | |
130 | .slice() | |
131 | .sort((a, b) => a - b) | |
132 | ) | |
133 | ] | |
134 | } | |
135 | } |