1772276c709a8cfba220df4aeafda075cdf92109
[poolifier.git] / src / pools / selection-strategies / worker-choice-strategy-context.ts
1 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
2 import type { IPool } from '../pool'
3 import type { IWorker } from '../worker'
4 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
5 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy'
6 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy'
7 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy'
8 import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy'
9 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
10 import type {
11 IWorkerChoiceStrategy,
12 StrategyPolicy,
13 TaskStatisticsRequirements,
14 WorkerChoiceStrategy,
15 WorkerChoiceStrategyOptions
16 } from './selection-strategies-types'
17 import { WorkerChoiceStrategies } from './selection-strategies-types'
18 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy'
19
20 /**
21 * The worker choice strategy context.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
26 */
27 export class WorkerChoiceStrategyContext<
28 Worker extends IWorker,
29 Data = unknown,
30 Response = unknown
31 > {
32 private readonly workerChoiceStrategies: Map<
33 WorkerChoiceStrategy,
34 IWorkerChoiceStrategy
35 >
36
37 /**
38 * Worker choice strategy context constructor.
39 *
40 * @param pool - The pool instance.
41 * @param workerChoiceStrategy - The worker choice strategy.
42 * @param opts - The worker choice strategy options.
43 */
44 public constructor (
45 pool: IPool<Worker, Data, Response>,
46 private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
47 private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
48 ) {
49 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
50 this.execute = this.execute.bind(this)
51 this.workerChoiceStrategies = new Map<
52 WorkerChoiceStrategy,
53 IWorkerChoiceStrategy
54 >([
55 [
56 WorkerChoiceStrategies.ROUND_ROBIN,
57 new (RoundRobinWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
58 pool,
59 opts
60 )
61 ],
62 [
63 WorkerChoiceStrategies.LEAST_USED,
64 new (LeastUsedWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
65 pool,
66 opts
67 )
68 ],
69 [
70 WorkerChoiceStrategies.LEAST_BUSY,
71 new (LeastBusyWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
72 pool,
73 opts
74 )
75 ],
76 [
77 WorkerChoiceStrategies.LEAST_ELU,
78 new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
79 pool,
80 opts
81 )
82 ],
83 [
84 WorkerChoiceStrategies.FAIR_SHARE,
85 new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
86 pool,
87 opts
88 )
89 ],
90 [
91 WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
92 new (WeightedRoundRobinWorkerChoiceStrategy.bind(this))<
93 Worker,
94 Data,
95 Response
96 >(pool, opts)
97 ],
98 [
99 WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
100 new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))<
101 Worker,
102 Data,
103 Response
104 >(pool, opts)
105 ]
106 ])
107 }
108
109 /**
110 * Gets the strategy policy in the context.
111 *
112 * @returns The strategy policy.
113 */
114 public getStrategyPolicy (): StrategyPolicy {
115 return (
116 this.workerChoiceStrategies.get(
117 this.workerChoiceStrategy
118 ) as IWorkerChoiceStrategy
119 ).strategyPolicy
120 }
121
122 /**
123 * Gets the worker choice strategy in the context task statistics requirements.
124 *
125 * @returns The task statistics requirements.
126 */
127 public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
128 return (
129 this.workerChoiceStrategies.get(
130 this.workerChoiceStrategy
131 ) as IWorkerChoiceStrategy
132 ).taskStatisticsRequirements
133 }
134
135 /**
136 * Sets the worker choice strategy to use in the context.
137 *
138 * @param workerChoiceStrategy - The worker choice strategy to set.
139 */
140 public setWorkerChoiceStrategy (
141 workerChoiceStrategy: WorkerChoiceStrategy
142 ): void {
143 if (this.workerChoiceStrategy !== workerChoiceStrategy) {
144 this.workerChoiceStrategy = workerChoiceStrategy
145 }
146 this.workerChoiceStrategies.get(this.workerChoiceStrategy)?.reset()
147 }
148
149 /**
150 * Updates the worker node key in the worker choice strategy in the context internals.
151 *
152 * @returns `true` if the update is successful, `false` otherwise.
153 */
154 public update (workerNodeKey: number): boolean {
155 return (
156 this.workerChoiceStrategies.get(
157 this.workerChoiceStrategy
158 ) as IWorkerChoiceStrategy
159 ).update(workerNodeKey)
160 }
161
162 /**
163 * Executes the worker choice strategy in the context algorithm.
164 *
165 * @returns The key of the worker node.
166 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
167 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the maximum consecutive worker choice strategy executions has been reached.
168 */
169 public execute (): number {
170 const workerChoiceStrategy = this.workerChoiceStrategies.get(
171 this.workerChoiceStrategy
172 ) as IWorkerChoiceStrategy
173 let workerNodeKey: number | undefined
174 const maxExecutionCount = 10000
175 let executionCount = 0
176 let chooseCount = 0
177 let retriesCount = 0
178 do {
179 if (workerChoiceStrategy.hasPoolWorkerNodesReady()) {
180 workerNodeKey = workerChoiceStrategy.choose()
181 if (chooseCount > 0) {
182 retriesCount++
183 }
184 chooseCount++
185 }
186 executionCount++
187 } while (
188 executionCount < maxExecutionCount &&
189 (!workerChoiceStrategy.hasPoolWorkerNodesReady() ||
190 (workerNodeKey == null && retriesCount < (this.opts.retries as number)))
191 )
192 if (executionCount >= maxExecutionCount) {
193 throw new RangeError(
194 `Worker choice strategy consecutive executions has exceeded the maximum of ${maxExecutionCount}`
195 )
196 }
197 if (workerNodeKey == null) {
198 throw new Error(
199 `Worker node key chosen is null or undefined after ${retriesCount} retries`
200 )
201 }
202 return workerNodeKey
203 }
204
205 /**
206 * Removes the worker node key from the worker choice strategy in the context.
207 *
208 * @param workerNodeKey - The worker node key.
209 * @returns `true` if the removal is successful, `false` otherwise.
210 */
211 public remove (workerNodeKey: number): boolean {
212 return (
213 this.workerChoiceStrategies.get(
214 this.workerChoiceStrategy
215 ) as IWorkerChoiceStrategy
216 ).remove(workerNodeKey)
217 }
218
219 /**
220 * Sets the worker choice strategies in the context options.
221 *
222 * @param opts - The worker choice strategy options.
223 */
224 public setOptions (opts: WorkerChoiceStrategyOptions): void {
225 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
226 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
227 workerChoiceStrategy.setOptions(opts)
228 }
229 }
230 }