d6a6e26d27e87fbb263a9ef5b61e5c1745b60950
[poolifier.git] / src / pools / selection-strategies / worker-choice-strategy-context.ts
1 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
2 import type { IPool } from '../pool'
3 import type { IWorker } from '../worker'
4 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
5 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy'
6 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy'
7 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy'
8 import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy'
9 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
10 import type {
11 IWorkerChoiceStrategy,
12 StrategyPolicy,
13 TaskStatisticsRequirements,
14 WorkerChoiceStrategy,
15 WorkerChoiceStrategyOptions
16 } from './selection-strategies-types'
17 import { WorkerChoiceStrategies } from './selection-strategies-types'
18 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy'
19
20 /**
21 * The worker choice strategy context.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
26 */
27 export class WorkerChoiceStrategyContext<
28 Worker extends IWorker,
29 Data = unknown,
30 Response = unknown
31 > {
32 private readonly workerChoiceStrategies: Map<
33 WorkerChoiceStrategy,
34 IWorkerChoiceStrategy
35 >
36
37 /**
38 * Worker choice strategy context constructor.
39 *
40 * @param pool - The pool instance.
41 * @param workerChoiceStrategy - The worker choice strategy.
42 * @param opts - The worker choice strategy options.
43 */
44 public constructor (
45 pool: IPool<Worker, Data, Response>,
46 private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
47 private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
48 ) {
49 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
50 this.execute = this.execute.bind(this)
51 this.workerChoiceStrategies = new Map<
52 WorkerChoiceStrategy,
53 IWorkerChoiceStrategy
54 >([
55 [
56 WorkerChoiceStrategies.ROUND_ROBIN,
57 new (RoundRobinWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
58 pool,
59 opts
60 )
61 ],
62 [
63 WorkerChoiceStrategies.LEAST_USED,
64 new (LeastUsedWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
65 pool,
66 opts
67 )
68 ],
69 [
70 WorkerChoiceStrategies.LEAST_BUSY,
71 new (LeastBusyWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
72 pool,
73 opts
74 )
75 ],
76 [
77 WorkerChoiceStrategies.LEAST_ELU,
78 new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
79 pool,
80 opts
81 )
82 ],
83 [
84 WorkerChoiceStrategies.FAIR_SHARE,
85 new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
86 pool,
87 opts
88 )
89 ],
90 [
91 WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
92 new (WeightedRoundRobinWorkerChoiceStrategy.bind(this))<
93 Worker,
94 Data,
95 Response
96 >(pool, opts)
97 ],
98 [
99 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
100 new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))<
101 Worker,
102 Data,
103 Response
104 >(pool, opts)
105 ]
106 ])
107 }
108
109 /**
110 * Gets the strategy policy in the context.
111 *
112 * @returns The strategy policy.
113 */
114 public getStrategyPolicy (): StrategyPolicy {
115 return (
116 this.workerChoiceStrategies.get(
117 this.workerChoiceStrategy
118 ) as IWorkerChoiceStrategy
119 ).strategyPolicy
120 }
121
122 /**
123 * Gets the worker choice strategy in the context task statistics requirements.
124 *
125 * @returns The task statistics requirements.
126 */
127 public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
128 return (
129 this.workerChoiceStrategies.get(
130 this.workerChoiceStrategy
131 ) as IWorkerChoiceStrategy
132 ).taskStatisticsRequirements
133 }
134
135 /**
136 * Sets the worker choice strategy to use in the context.
137 *
138 * @param workerChoiceStrategy - The worker choice strategy to set.
139 */
140 public setWorkerChoiceStrategy (
141 workerChoiceStrategy: WorkerChoiceStrategy
142 ): void {
143 if (this.workerChoiceStrategy !== workerChoiceStrategy) {
144 this.workerChoiceStrategy = workerChoiceStrategy
145 }
146 this.workerChoiceStrategies.get(this.workerChoiceStrategy)?.reset()
147 }
148
149 /**
150 * Updates the worker node key in the worker choice strategy in the context internals.
151 *
152 * @returns `true` if the update is successful, `false` otherwise.
153 */
154 public update (workerNodeKey: number): boolean {
155 return (
156 this.workerChoiceStrategies.get(
157 this.workerChoiceStrategy
158 ) as IWorkerChoiceStrategy
159 ).update(workerNodeKey)
160 }
161
162 /**
163 * Executes the worker choice strategy in the context algorithm.
164 *
165 * @returns The key of the worker node.
166 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
167 */
168 public execute (): number {
169 const workerChoiceStrategy = this.workerChoiceStrategies.get(
170 this.workerChoiceStrategy
171 ) as IWorkerChoiceStrategy
172 if (!workerChoiceStrategy.hasPoolWorkerNodesReady()) {
173 // wait for a worker node to be ready without blocking the event loop
174 }
175 return this.executeStrategy(workerChoiceStrategy)
176 }
177
178 /**
179 * Executes the given worker choice strategy.
180 *
181 * @param workerChoiceStrategy - The worker choice strategy.
182 * @returns The key of the worker node.
183 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
184 */
185 private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
186 let workerNodeKey: number | undefined
187 let chooseCount = 0
188 let retriesCount = 0
189 do {
190 workerNodeKey = workerChoiceStrategy.choose()
191 if (workerNodeKey == null && chooseCount > 0) {
192 retriesCount++
193 }
194 chooseCount++
195 } while (
196 workerNodeKey == null &&
197 retriesCount < (this.opts.retries as number)
198 )
199 if (workerNodeKey == null) {
200 throw new Error(
201 `Worker node key chosen is null or undefined after ${retriesCount} retries`
202 )
203 }
204 return workerNodeKey
205 }
206
207 /**
208 * Removes the worker node key from the worker choice strategy in the context.
209 *
210 * @param workerNodeKey - The worker node key.
211 * @returns `true` if the removal is successful, `false` otherwise.
212 */
213 public remove (workerNodeKey: number): boolean {
214 return (
215 this.workerChoiceStrategies.get(
216 this.workerChoiceStrategy
217 ) as IWorkerChoiceStrategy
218 ).remove(workerNodeKey)
219 }
220
221 /**
222 * Sets the worker choice strategies in the context options.
223 *
224 * @param opts - The worker choice strategy options.
225 */
226 public setOptions (opts: WorkerChoiceStrategyOptions): void {
227 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
228 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
229 workerChoiceStrategy.setOptions(opts)
230 }
231 }
232 }