fix: ensure task function ops sync worker choice strategies
[poolifier.git] / src / pools / selection-strategies / worker-choice-strategies-context.ts
1 import type { IPool } from '../pool.js'
2 import type { IWorker } from '../worker.js'
3 import type {
4 IWorkerChoiceStrategy,
5 StrategyPolicy,
6 TaskStatisticsRequirements,
7 WorkerChoiceStrategy,
8 WorkerChoiceStrategyOptions
9 } from './selection-strategies-types.js'
10 import { WorkerChoiceStrategies } from './selection-strategies-types.js'
11 import {
12 getWorkerChoiceStrategiesRetries,
13 getWorkerChoiceStrategy
14 } from './selection-strategies-utils.js'
15
16 /**
17 * The worker choice strategies context.
18 *
19 * @typeParam Worker - Type of worker.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 */
23 export class WorkerChoiceStrategiesContext<
24 Worker extends IWorker,
25 Data = unknown,
26 Response = unknown
27 > {
28 /**
29 * The number of worker choice strategies execution retries.
30 */
31 public retriesCount: number
32
33 /**
34 * The default worker choice strategy in the context.
35 */
36 private defaultWorkerChoiceStrategy: WorkerChoiceStrategy
37
38 /**
39 * The worker choice strategies registered in the context.
40 */
41 private readonly workerChoiceStrategies: Map<
42 WorkerChoiceStrategy,
43 IWorkerChoiceStrategy
44 >
45
46 /**
47 * The maximum number of worker choice strategies execution retries.
48 */
49 private readonly retries: number
50
51 /**
52 * Worker choice strategies context constructor.
53 *
54 * @param pool - The pool instance.
55 * @param workerChoiceStrategies - The worker choice strategies. @defaultValue [WorkerChoiceStrategies.ROUND_ROBIN]
56 * @param opts - The worker choice strategy options.
57 */
58 public constructor (
59 private readonly pool: IPool<Worker, Data, Response>,
60 workerChoiceStrategies: WorkerChoiceStrategy[] = [
61 WorkerChoiceStrategies.ROUND_ROBIN
62 ],
63 opts?: WorkerChoiceStrategyOptions
64 ) {
65 this.execute = this.execute.bind(this)
66 this.defaultWorkerChoiceStrategy = workerChoiceStrategies[0]
67 this.workerChoiceStrategies = new Map<
68 WorkerChoiceStrategy,
69 IWorkerChoiceStrategy
70 >()
71 for (const workerChoiceStrategy of workerChoiceStrategies) {
72 this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
73 }
74 this.retriesCount = 0
75 this.retries = getWorkerChoiceStrategiesRetries<Worker, Data, Response>(
76 this.pool,
77 opts
78 )
79 }
80
81 /**
82 * Gets the active worker choice strategies in the context policy.
83 *
84 * @returns The strategies policy.
85 */
86 public getPolicy (): StrategyPolicy {
87 const policies: StrategyPolicy[] = []
88 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
89 policies.push(workerChoiceStrategy.strategyPolicy)
90 }
91 return {
92 dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
93 dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
94 }
95 }
96
97 /**
98 * Gets the active worker choice strategies in the context task statistics requirements.
99 *
100 * @returns The task statistics requirements.
101 */
102 public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
103 const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
104 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
105 taskStatisticsRequirements.push(
106 workerChoiceStrategy.taskStatisticsRequirements
107 )
108 }
109 return {
110 runTime: {
111 aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
112 average: taskStatisticsRequirements.some(r => r.runTime.average),
113 median: taskStatisticsRequirements.some(r => r.runTime.median)
114 },
115 waitTime: {
116 aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
117 average: taskStatisticsRequirements.some(r => r.waitTime.average),
118 median: taskStatisticsRequirements.some(r => r.waitTime.median)
119 },
120 elu: {
121 aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
122 average: taskStatisticsRequirements.some(r => r.elu.average),
123 median: taskStatisticsRequirements.some(r => r.elu.median)
124 }
125 }
126 }
127
128 /**
129 * Sets the default worker choice strategy to use in the context.
130 *
131 * @param workerChoiceStrategy - The default worker choice strategy to set.
132 * @param opts - The worker choice strategy options.
133 */
134 public setDefaultWorkerChoiceStrategy (
135 workerChoiceStrategy: WorkerChoiceStrategy,
136 opts?: WorkerChoiceStrategyOptions
137 ): void {
138 if (workerChoiceStrategy !== this.defaultWorkerChoiceStrategy) {
139 this.defaultWorkerChoiceStrategy = workerChoiceStrategy
140 this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
141 }
142 }
143
144 /**
145 * Updates the worker node key in the active worker choice strategies in the context internals.
146 *
147 * @returns `true` if the update is successful, `false` otherwise.
148 */
149 public update (workerNodeKey: number): boolean {
150 const res: boolean[] = []
151 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
152 res.push(workerChoiceStrategy.update(workerNodeKey))
153 }
154 return res.every(r => r)
155 }
156
157 /**
158 * Executes the worker choice strategy in the context algorithm.
159 *
160 * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy
161 * @returns The key of the worker node.
162 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
163 */
164 public execute (
165 workerChoiceStrategy: WorkerChoiceStrategy = this
166 .defaultWorkerChoiceStrategy
167 ): number {
168 return this.executeStrategy(
169 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
170 this.workerChoiceStrategies.get(workerChoiceStrategy)!
171 )
172 }
173
174 /**
175 * Executes the given worker choice strategy.
176 *
177 * @param workerChoiceStrategy - The worker choice strategy.
178 * @returns The key of the worker node.
179 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
180 */
181 private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
182 let workerNodeKey: number | undefined
183 let chooseCount = 0
184 let retriesCount = 0
185 do {
186 workerNodeKey = workerChoiceStrategy.choose()
187 if (workerNodeKey == null && chooseCount > 0) {
188 ++retriesCount
189 ++this.retriesCount
190 }
191 ++chooseCount
192 } while (workerNodeKey == null && retriesCount < this.retries)
193 if (workerNodeKey == null) {
194 throw new Error(
195 `Worker node key chosen is null or undefined after ${retriesCount} retries`
196 )
197 }
198 return workerNodeKey
199 }
200
201 /**
202 * Removes the worker node key from the active worker choice strategies in the context.
203 *
204 * @param workerNodeKey - The worker node key.
205 * @returns `true` if the removal is successful, `false` otherwise.
206 */
207 public remove (workerNodeKey: number): boolean {
208 const res: boolean[] = []
209 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
210 res.push(workerChoiceStrategy.remove(workerNodeKey))
211 }
212 return res.every(r => r)
213 }
214
215 /**
216 * Sets the active worker choice strategies in the context options.
217 *
218 * @param opts - The worker choice strategy options.
219 */
220 public setOptions (opts: WorkerChoiceStrategyOptions | undefined): void {
221 for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
222 workerChoiceStrategy.setOptions(opts)
223 }
224 }
225
226 /**
227 * Synchronizes the active worker choice strategies in the context with the given worker choice strategies.
228 *
229 * @param workerChoiceStrategies - The worker choice strategies to synchronize.
230 * @param opts - The worker choice strategy options.
231 */
232 public syncWorkerChoiceStrategies (
233 workerChoiceStrategies: Set<WorkerChoiceStrategy>,
234 opts?: WorkerChoiceStrategyOptions
235 ): void {
236 for (const workerChoiceStrategy of this.workerChoiceStrategies.keys()) {
237 if (!workerChoiceStrategies.has(workerChoiceStrategy)) {
238 this.removeWorkerChoiceStrategy(workerChoiceStrategy)
239 }
240 }
241 for (const workerChoiceStrategy of workerChoiceStrategies) {
242 if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
243 this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
244 }
245 }
246 }
247
248 /**
249 * Adds a worker choice strategy to the context.
250 *
251 * @param workerChoiceStrategy - The worker choice strategy to add.
252 * @param opts - The worker choice strategy options.
253 * @returns The worker choice strategies.
254 */
255 private addWorkerChoiceStrategy (
256 workerChoiceStrategy: WorkerChoiceStrategy,
257 pool: IPool<Worker, Data, Response>,
258 opts?: WorkerChoiceStrategyOptions
259 ): Map<WorkerChoiceStrategy, IWorkerChoiceStrategy> {
260 if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
261 return this.workerChoiceStrategies.set(
262 workerChoiceStrategy,
263 getWorkerChoiceStrategy<Worker, Data, Response>(
264 workerChoiceStrategy,
265 pool,
266 this,
267 opts
268 )
269 )
270 }
271 return this.workerChoiceStrategies
272 }
273
274 /**
275 * Removes a worker choice strategy from the context.
276 *
277 * @param workerChoiceStrategy - The worker choice strategy to remove.
278 * @returns `true` if the worker choice strategy is removed, `false` otherwise.
279 */
280 private removeWorkerChoiceStrategy (
281 workerChoiceStrategy: WorkerChoiceStrategy
282 ): boolean {
283 return this.workerChoiceStrategies.delete(workerChoiceStrategy)
284 }
285 }