0db54c4fd7b6e7279da25512fa7f18c140402da8
[poolifier.git] / src / pools / selection-strategies / interleaved-weighted-round-robin-worker-choice-strategy.ts
1 import type { IPool } from '../pool.js'
2 import type { IWorker } from '../worker.js'
3 import type {
4 IWorkerChoiceStrategy,
5 TaskStatisticsRequirements,
6 WorkerChoiceStrategyOptions,
7 } from './selection-strategies-types.js'
8
9 import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
10 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
11
12 /**
13 * Selects the next worker with an interleaved weighted round robin scheduling algorithm.
14 * @typeParam Worker - Type of worker which manages the strategy.
15 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
16 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
17 */
18 export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
19 Worker extends IWorker,
20 Data = unknown,
21 Response = unknown
22 >
23 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
24 implements IWorkerChoiceStrategy {
25 /**
26 * Round id.
27 */
28 private roundId = 0
29
30 /**
31 * Round weights.
32 */
33 private roundWeights: number[]
34 /**
35 * Worker node id.
36 */
37 private workerNodeId = 0
38 /**
39 * Worker node virtual execution time.
40 */
41 private workerNodeVirtualTaskExecutionTime = 0
42 /** @inheritDoc */
43 public override readonly taskStatisticsRequirements: TaskStatisticsRequirements =
44 {
45 elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
46 runTime: {
47 aggregate: true,
48 average: true,
49 median: false,
50 },
51 waitTime: {
52 aggregate: true,
53 average: true,
54 median: false,
55 },
56 }
57
58 /** @inheritDoc */
59 public constructor (
60 pool: IPool<Worker, Data, Response>,
61 opts?: WorkerChoiceStrategyOptions
62 ) {
63 super(pool, opts)
64 this.setTaskStatisticsRequirements(this.opts)
65 this.roundWeights = this.getRoundWeights()
66 }
67
68 private getRoundWeights (): number[] {
69 return [
70 ...new Set(
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 Object.values(this.opts!.weights!)
73 .slice()
74 .sort((a, b) => a - b)
75 ),
76 ]
77 }
78
79 private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
80 if (this.pool.workerNodes.length === 0) {
81 this.workerNodeId = 0
82 } else if (
83 this.roundId === this.roundWeights.length - 1 &&
84 this.workerNodeId === this.pool.workerNodes.length - 1
85 ) {
86 this.roundId = 0
87 this.workerNodeId = 0
88 } else if (this.workerNodeId === this.pool.workerNodes.length - 1) {
89 this.roundId = this.roundId + 1
90 this.workerNodeId = 0
91 } else {
92 this.workerNodeId = this.workerNodeId + 1
93 }
94 }
95
96 /** @inheritDoc */
97 public choose (): number | undefined {
98 for (
99 let roundIndex = this.roundId;
100 roundIndex < this.roundWeights.length;
101 roundIndex++
102 ) {
103 this.roundId = roundIndex
104 for (
105 let workerNodeKey = this.workerNodeId;
106 workerNodeKey < this.pool.workerNodes.length;
107 workerNodeKey++
108 ) {
109 this.workerNodeId = workerNodeKey
110 if (
111 this.workerNodeId !== this.nextWorkerNodeKey &&
112 this.workerNodeVirtualTaskExecutionTime !== 0
113 ) {
114 this.workerNodeVirtualTaskExecutionTime = 0
115 }
116 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
117 const workerWeight = this.opts!.weights![workerNodeKey]
118 if (
119 this.isWorkerNodeReady(workerNodeKey) &&
120 workerWeight >= this.roundWeights[roundIndex] &&
121 this.workerNodeVirtualTaskExecutionTime < workerWeight
122 ) {
123 this.workerNodeVirtualTaskExecutionTime +=
124 this.getWorkerNodeTaskWaitTime(workerNodeKey) +
125 this.getWorkerNodeTaskRunTime(workerNodeKey)
126 this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
127 this.nextWorkerNodeKey = workerNodeKey
128 return this.nextWorkerNodeKey
129 }
130 }
131 }
132 this.interleavedWeightedRoundRobinNextWorkerNodeId()
133 }
134
135 /** @inheritDoc */
136 public remove (workerNodeKey: number): boolean {
137 if (this.pool.workerNodes.length === 0) {
138 this.resetWorkerNodeKeyProperties()
139 this.workerNodeId = 0
140 this.workerNodeVirtualTaskExecutionTime = 0
141 return true
142 }
143 if (
144 this.workerNodeId === workerNodeKey &&
145 this.workerNodeId > this.pool.workerNodes.length - 1
146 ) {
147 this.workerNodeId = this.pool.workerNodes.length - 1
148 }
149 if (
150 this.previousWorkerNodeKey === workerNodeKey &&
151 this.previousWorkerNodeKey > this.pool.workerNodes.length - 1
152 ) {
153 this.previousWorkerNodeKey = this.pool.workerNodes.length - 1
154 }
155 return true
156 }
157
158 /** @inheritDoc */
159 public reset (): boolean {
160 this.resetWorkerNodeKeyProperties()
161 this.roundId = 0
162 this.workerNodeId = 0
163 this.workerNodeVirtualTaskExecutionTime = 0
164 return true
165 }
166
167 /** @inheritDoc */
168 public override setOptions (
169 opts: undefined | WorkerChoiceStrategyOptions
170 ): void {
171 super.setOptions(opts)
172 this.roundWeights = this.getRoundWeights()
173 }
174
175 /** @inheritDoc */
176 public update (): boolean {
177 return true
178 }
179 }