a1ccb4e96277c21895a868e231c0941a030a6371
[poolifier.git] / src / pools / selection-strategies / worker-choice-strategies-context.ts
1 import type { IPool } from '../pool.js'
2 import type { IWorker } from '../worker.js'
3 import type {
4 IWorkerChoiceStrategy,
5 StrategyPolicy,
6 TaskStatisticsRequirements,
7 WorkerChoiceStrategy,
8 WorkerChoiceStrategyOptions,
9 } from './selection-strategies-types.js'
10
11 import { WorkerChoiceStrategies } from './selection-strategies-types.js'
12 import {
13 buildWorkerChoiceStrategiesPolicy,
14 buildWorkerChoiceStrategiesTaskStatisticsRequirements,
15 getWorkerChoiceStrategiesRetries,
16 getWorkerChoiceStrategy,
17 } from './selection-strategies-utils.js'
18
19 /**
20 * The worker choice strategies context.
21 * @typeParam Worker - Type of worker.
22 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
23 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
24 * @internal
25 */
26 export class WorkerChoiceStrategiesContext<
27 Worker extends IWorker,
28 Data = unknown,
29 Response = unknown
30 > {
31 /**
32 * The default worker choice strategy in the context.
33 */
34 private defaultWorkerChoiceStrategy: WorkerChoiceStrategy
35
36 /**
37 * The maximum number of worker choice strategies execution retries.
38 */
39 private readonly retries: number
40
41 /**
42 * The worker choice strategies registered in the context.
43 */
44 private readonly workerChoiceStrategies: Map<
45 WorkerChoiceStrategy,
46 IWorkerChoiceStrategy
47 >
48
49 /**
50 * The active worker choice strategies in the context policy.
51 */
52 private workerChoiceStrategiesPolicy: StrategyPolicy
53
54 /**
55 * The active worker choice strategies in the context task statistics requirements.
56 */
57 private workerChoiceStrategiesTaskStatisticsRequirements: TaskStatisticsRequirements
58
59 /**
60 * The number of worker choice strategies execution retries.
61 */
62 public retriesCount: number
63
64 /**
65 * Worker choice strategies context constructor.
66 * @param pool - The pool instance.
67 * @param workerChoiceStrategies - The worker choice strategies. @defaultValue [WorkerChoiceStrategies.ROUND_ROBIN]
68 * @param opts - The worker choice strategy options.
69 */
70 public constructor (
71 private readonly pool: IPool<Worker, Data, Response>,
72 workerChoiceStrategies: WorkerChoiceStrategy[] = [
73 WorkerChoiceStrategies.ROUND_ROBIN,
74 ],
75 opts?: WorkerChoiceStrategyOptions
76 ) {
77 this.execute = this.execute.bind(this)
78 this.defaultWorkerChoiceStrategy = workerChoiceStrategies[0]
79 this.workerChoiceStrategies = new Map<
80 WorkerChoiceStrategy,
81 IWorkerChoiceStrategy
82 >()
83 for (const workerChoiceStrategy of workerChoiceStrategies) {
84 this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
85 }
86 this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy(
87 this.workerChoiceStrategies
88 )
89 this.workerChoiceStrategiesTaskStatisticsRequirements =
90 buildWorkerChoiceStrategiesTaskStatisticsRequirements(
91 this.workerChoiceStrategies
92 )
93 this.retriesCount = 0
94 this.retries = getWorkerChoiceStrategiesRetries<Worker, Data, Response>(
95 this.pool,
96 opts
97 )
98 }
99
100 /**
101 * Adds a worker choice strategy to the context.
102 * @param workerChoiceStrategy - The worker choice strategy to add.
103 * @param pool - The pool instance.
104 * @param opts - The worker choice strategy options.
105 * @returns The worker choice strategies.
106 */
107 private addWorkerChoiceStrategy (
108 workerChoiceStrategy: WorkerChoiceStrategy,
109 pool: IPool<Worker, Data, Response>,
110 opts?: WorkerChoiceStrategyOptions
111 ): Map<WorkerChoiceStrategy, IWorkerChoiceStrategy> {
112 if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
113 return this.workerChoiceStrategies.set(
114 workerChoiceStrategy,
115 getWorkerChoiceStrategy<Worker, Data, Response>(
116 workerChoiceStrategy,
117 pool,
118 this,
119 opts
120 )
121 )
122 }
123 return this.workerChoiceStrategies
124 }
125
126 /**
127 * Executes the given worker choice strategy.
128 * @param workerChoiceStrategy - The worker choice strategy.
129 * @returns The key of the worker node.
130 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
131 */
132 private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
133 let workerNodeKey: number | undefined
134 let chooseCount = 0
135 let retriesCount = 0
136 do {
137 workerNodeKey = workerChoiceStrategy.choose()
138 if (workerNodeKey == null && chooseCount > 0) {
139 ++retriesCount
140 ++this.retriesCount
141 }
142 ++chooseCount
143 } while (workerNodeKey == null && retriesCount < this.retries)
144 if (workerNodeKey == null) {
145 throw new Error(
146 `Worker node key chosen is null or undefined after ${retriesCount.toString()} retries`
147 )
148 }
149 return workerNodeKey
150 }
151
152 /**
153 * Removes a worker choice strategy from the context.
154 * @param workerChoiceStrategy - The worker choice strategy to remove.
155 * @returns `true` if the worker choice strategy is removed, `false` otherwise.
156 */
157 private removeWorkerChoiceStrategy (
158 workerChoiceStrategy: WorkerChoiceStrategy
159 ): boolean {
160 return this.workerChoiceStrategies.delete(workerChoiceStrategy)
161 }
162
163 /**
164 * Executes the given worker choice strategy in the context algorithm.
165 * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy
166 * @returns The key of the worker node.
167 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
168 */
169 public execute (
170 workerChoiceStrategy: WorkerChoiceStrategy = this
171 .defaultWorkerChoiceStrategy
172 ): number {
173 return this.executeStrategy(
174 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
175 this.workerChoiceStrategies.get(workerChoiceStrategy)!
176 )
177 }
178
179 /**
180 * Gets the active worker choice strategies in the context policy.
181 * @returns The strategies policy.
182 */
183 public getPolicy (): StrategyPolicy {
184 return this.workerChoiceStrategiesPolicy
185 }
186
187 /**
188 * Gets the active worker choice strategies in the context task statistics requirements.
189 * @returns The strategies task statistics requirements.
190 */
191 public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
192 return this.workerChoiceStrategiesTaskStatisticsRequirements
193 }
194
195 /**
196 * Removes the worker node key from the active worker choice strategies in the context.
197 * @param workerNodeKey - The worker node key.
198 * @returns `true` if the removal is successful, `false` otherwise.
199 */
200 public remove (workerNodeKey: number): boolean {
201 return Array.from(
202 this.workerChoiceStrategies,
203 ([_, workerChoiceStrategy]) => workerChoiceStrategy.remove(workerNodeKey)
204 ).every(r => r)
205 }
206
207 /**
208 * Sets the default worker choice strategy to use in the context.
209 * @param workerChoiceStrategy - The default worker choice strategy to set.
210 * @param opts - The worker choice strategy options.
211 */
212 public setDefaultWorkerChoiceStrategy (
213 workerChoiceStrategy: WorkerChoiceStrategy,
214 opts?: WorkerChoiceStrategyOptions
215 ): void {
216 if (workerChoiceStrategy !== this.defaultWorkerChoiceStrategy) {
217 this.defaultWorkerChoiceStrategy = workerChoiceStrategy
218 this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
219 }
220 }
221
222 /**
223 * Sets the active worker choice strategies in the context options.
224 * @param opts - The worker choice strategy options.
225 */
226 public setOptions (opts: undefined | WorkerChoiceStrategyOptions): void {
227 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
228 workerChoiceStrategy.setOptions(opts)
229 }
230 }
231
232 /**
233 * Synchronizes the active worker choice strategies in the context with the given worker choice strategies.
234 * @param workerChoiceStrategies - The worker choice strategies to synchronize.
235 * @param opts - The worker choice strategy options.
236 */
237 public syncWorkerChoiceStrategies (
238 workerChoiceStrategies: Set<WorkerChoiceStrategy>,
239 opts?: WorkerChoiceStrategyOptions
240 ): void {
241 for (const workerChoiceStrategy of this.workerChoiceStrategies.keys()) {
242 if (!workerChoiceStrategies.has(workerChoiceStrategy)) {
243 this.removeWorkerChoiceStrategy(workerChoiceStrategy)
244 }
245 }
246 for (const workerChoiceStrategy of workerChoiceStrategies) {
247 if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
248 this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
249 }
250 }
251 this.workerChoiceStrategiesPolicy = buildWorkerChoiceStrategiesPolicy(
252 this.workerChoiceStrategies
253 )
254 this.workerChoiceStrategiesTaskStatisticsRequirements =
255 buildWorkerChoiceStrategiesTaskStatisticsRequirements(
256 this.workerChoiceStrategies
257 )
258 }
259
260 /**
261 * Updates the worker node key in the active worker choice strategies in the context internals.
262 * @param workerNodeKey - The worker node key.
263 * @returns `true` if the update is successful, `false` otherwise.
264 */
265 public update (workerNodeKey: number): boolean {
266 return Array.from(
267 this.workerChoiceStrategies,
268 ([_, workerChoiceStrategy]) => workerChoiceStrategy.update(workerNodeKey)
269 ).every(r => r)
270 }
271 }