fix: ensure the number of worker choice retries is enough for WRR
[poolifier.git] / src / pools / selection-strategies / worker-choice-strategy-context.ts
1 import { getDefaultInternalWorkerChoiceStrategyOptions } from '../../utils'
2 import type { IPool } from '../pool'
3 import type { IWorker } from '../worker'
4 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
5 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy'
6 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy'
7 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy'
8 import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy'
9 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
10 import type {
11 IWorkerChoiceStrategy,
12 InternalWorkerChoiceStrategyOptions,
13 StrategyPolicy,
14 TaskStatisticsRequirements,
15 WorkerChoiceStrategy
16 } from './selection-strategies-types'
17 import { WorkerChoiceStrategies } from './selection-strategies-types'
18 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy'
19
20 /**
21 * The worker choice strategy context.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
26 */
27 export class WorkerChoiceStrategyContext<
28 Worker extends IWorker,
29 Data = unknown,
30 Response = unknown
31 > {
32 private readonly workerChoiceStrategies: Map<
33 WorkerChoiceStrategy,
34 IWorkerChoiceStrategy
35 >
36
37 /**
38 * Worker choice strategy context constructor.
39 *
40 * @param pool - The pool instance.
41 * @param workerChoiceStrategy - The worker choice strategy.
42 * @param opts - The worker choice strategy options.
43 */
44 public constructor (
45 pool: IPool<Worker, Data, Response>,
46 private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
47 private opts?: InternalWorkerChoiceStrategyOptions
48 ) {
49 this.opts = {
50 ...getDefaultInternalWorkerChoiceStrategyOptions(
51 pool.info.maxSize +
52 Object.keys((this.opts?.weights as Record<number, number>) ?? {})
53 .length
54 ),
55 ...this.opts
56 }
57 this.execute = this.execute.bind(this)
58 this.workerChoiceStrategies = new Map<
59 WorkerChoiceStrategy,
60 IWorkerChoiceStrategy
61 >([
62 [
63 WorkerChoiceStrategies.ROUND_ROBIN,
64 new (RoundRobinWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
65 pool,
66 this.opts
67 )
68 ],
69 [
70 WorkerChoiceStrategies.LEAST_USED,
71 new (LeastUsedWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
72 pool,
73 this.opts
74 )
75 ],
76 [
77 WorkerChoiceStrategies.LEAST_BUSY,
78 new (LeastBusyWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
79 pool,
80 this.opts
81 )
82 ],
83 [
84 WorkerChoiceStrategies.LEAST_ELU,
85 new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
86 pool,
87 this.opts
88 )
89 ],
90 [
91 WorkerChoiceStrategies.FAIR_SHARE,
92 new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
93 pool,
94 this.opts
95 )
96 ],
97 [
98 WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
99 new (WeightedRoundRobinWorkerChoiceStrategy.bind(this))<
100 Worker,
101 Data,
102 Response
103 >(pool, this.opts)
104 ],
105 [
106 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
107 new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))<
108 Worker,
109 Data,
110 Response
111 >(pool, this.opts)
112 ]
113 ])
114 }
115
116 /**
117 * Gets the strategy policy in the context.
118 *
119 * @returns The strategy policy.
120 */
121 public getStrategyPolicy (): StrategyPolicy {
122 return (
123 this.workerChoiceStrategies.get(
124 this.workerChoiceStrategy
125 ) as IWorkerChoiceStrategy
126 ).strategyPolicy
127 }
128
129 /**
130 * Gets the worker choice strategy in the context task statistics requirements.
131 *
132 * @returns The task statistics requirements.
133 */
134 public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
135 return (
136 this.workerChoiceStrategies.get(
137 this.workerChoiceStrategy
138 ) as IWorkerChoiceStrategy
139 ).taskStatisticsRequirements
140 }
141
142 /**
143 * Sets the worker choice strategy to use in the context.
144 *
145 * @param workerChoiceStrategy - The worker choice strategy to set.
146 */
147 public setWorkerChoiceStrategy (
148 workerChoiceStrategy: WorkerChoiceStrategy
149 ): void {
150 if (this.workerChoiceStrategy !== workerChoiceStrategy) {
151 this.workerChoiceStrategy = workerChoiceStrategy
152 }
153 this.workerChoiceStrategies.get(this.workerChoiceStrategy)?.reset()
154 }
155
156 /**
157 * Updates the worker node key in the worker choice strategy in the context internals.
158 *
159 * @returns `true` if the update is successful, `false` otherwise.
160 */
161 public update (workerNodeKey: number): boolean {
162 return (
163 this.workerChoiceStrategies.get(
164 this.workerChoiceStrategy
165 ) as IWorkerChoiceStrategy
166 ).update(workerNodeKey)
167 }
168
169 /**
170 * Executes the worker choice strategy in the context algorithm.
171 *
172 * @returns The key of the worker node.
173 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
174 */
175 public execute (): number {
176 const workerChoiceStrategy = this.workerChoiceStrategies.get(
177 this.workerChoiceStrategy
178 ) as IWorkerChoiceStrategy
179 if (!workerChoiceStrategy.hasPoolWorkerNodesReady()) {
180 return this.execute()
181 }
182 return this.executeStrategy(workerChoiceStrategy)
183 }
184
185 /**
186 * Executes the given worker choice strategy.
187 *
188 * @param workerChoiceStrategy - The worker choice strategy.
189 * @returns The key of the worker node.
190 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
191 */
192 private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
193 let workerNodeKey: number | undefined
194 let chooseCount = 0
195 let retriesCount = 0
196 do {
197 workerNodeKey = workerChoiceStrategy.choose()
198 if (workerNodeKey == null && chooseCount > 0) {
199 retriesCount++
200 }
201 chooseCount++
202 } while (
203 workerNodeKey == null &&
204 retriesCount < (this.opts?.retries as number)
205 )
206 if (workerNodeKey == null) {
207 throw new Error(
208 `Worker node key chosen is null or undefined after ${retriesCount} retries`
209 )
210 }
211 return workerNodeKey
212 }
213
214 /**
215 * Removes the worker node key from the worker choice strategy in the context.
216 *
217 * @param workerNodeKey - The worker node key.
218 * @returns `true` if the removal is successful, `false` otherwise.
219 */
220 public remove (workerNodeKey: number): boolean {
221 return (
222 this.workerChoiceStrategies.get(
223 this.workerChoiceStrategy
224 ) as IWorkerChoiceStrategy
225 ).remove(workerNodeKey)
226 }
227
228 /**
229 * Sets the worker choice strategies in the context options.
230 *
231 * @param pool - The pool instance.
232 * @param opts - The worker choice strategy options.
233 */
234 public setOptions (
235 pool: IPool<Worker, Data, Response>,
236 opts?: InternalWorkerChoiceStrategyOptions
237 ): void {
238 this.opts = {
239 ...getDefaultInternalWorkerChoiceStrategyOptions(
240 pool.info.maxSize +
241 Object.keys((opts?.weights as Record<number, number>) ?? {}).length
242 ),
243 ...opts
244 }
245 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
246 workerChoiceStrategy.setOptions(this.opts)
247 }
248 }
249 }