1 import type { IPool
} from
'../pool.js'
2 import type { IWorker
} from
'../worker.js'
6 TaskStatisticsRequirements
,
8 WorkerChoiceStrategyOptions
9 } from
'./selection-strategies-types.js'
10 import { WorkerChoiceStrategies
} from
'./selection-strategies-types.js'
12 getWorkerChoiceStrategiesRetries
,
13 getWorkerChoiceStrategy
14 } from
'./selection-strategies-utils.js'
17 * The worker choice strategies context.
19 * @typeParam Worker - Type of worker.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
23 export class WorkerChoiceStrategiesContext
<
24 Worker
extends IWorker
,
29 * The number of worker choice strategies execution retries.
31 public retriesCount
: number
34 * The default worker choice strategy in the context.
36 private defaultWorkerChoiceStrategy
: WorkerChoiceStrategy
39 * The worker choice strategies registered in the context.
41 private readonly workerChoiceStrategies
: Map
<
47 * The maximum number of worker choice strategies execution retries.
49 private readonly retries
: number
52 * Worker choice strategies context constructor.
54 * @param pool - The pool instance.
55 * @param workerChoiceStrategies - The worker choice strategies. @defaultValue [WorkerChoiceStrategies.ROUND_ROBIN]
56 * @param opts - The worker choice strategy options.
59 private readonly pool
: IPool
<Worker
, Data
, Response
>,
60 workerChoiceStrategies
: WorkerChoiceStrategy
[] = [
61 WorkerChoiceStrategies
.ROUND_ROBIN
63 opts
?: WorkerChoiceStrategyOptions
65 this.execute
= this.execute
.bind(this)
66 this.defaultWorkerChoiceStrategy
= workerChoiceStrategies
[0]
67 this.workerChoiceStrategies
= new Map
<
71 for (const workerChoiceStrategy
of workerChoiceStrategies
) {
72 this.addWorkerChoiceStrategy(workerChoiceStrategy
, this.pool
, opts
)
75 this.retries
= getWorkerChoiceStrategiesRetries
<Worker
, Data
, Response
>(
82 * Gets the active worker choice strategies in the context policy.
84 * @returns The strategies policy.
86 public getPolicy (): StrategyPolicy
{
87 const policies
: StrategyPolicy
[] = []
88 for (const workerChoiceStrategy
of this.workerChoiceStrategies
.values()) {
89 policies
.push(workerChoiceStrategy
.strategyPolicy
)
92 dynamicWorkerUsage
: policies
.some(p
=> p
.dynamicWorkerUsage
),
93 dynamicWorkerReady
: policies
.some(p
=> p
.dynamicWorkerReady
)
98 * Gets the active worker choice strategies in the context task statistics requirements.
100 * @returns The task statistics requirements.
102 public getTaskStatisticsRequirements (): TaskStatisticsRequirements
{
103 const taskStatisticsRequirements
: TaskStatisticsRequirements
[] = []
104 for (const workerChoiceStrategy
of this.workerChoiceStrategies
.values()) {
105 taskStatisticsRequirements
.push(
106 workerChoiceStrategy
.taskStatisticsRequirements
111 aggregate
: taskStatisticsRequirements
.some(r
=> r
.runTime
.aggregate
),
112 average
: taskStatisticsRequirements
.some(r
=> r
.runTime
.average
),
113 median
: taskStatisticsRequirements
.some(r
=> r
.runTime
.median
)
116 aggregate
: taskStatisticsRequirements
.some(r
=> r
.waitTime
.aggregate
),
117 average
: taskStatisticsRequirements
.some(r
=> r
.waitTime
.average
),
118 median
: taskStatisticsRequirements
.some(r
=> r
.waitTime
.median
)
121 aggregate
: taskStatisticsRequirements
.some(r
=> r
.elu
.aggregate
),
122 average
: taskStatisticsRequirements
.some(r
=> r
.elu
.average
),
123 median
: taskStatisticsRequirements
.some(r
=> r
.elu
.median
)
129 * Sets the default worker choice strategy to use in the context.
131 * @param workerChoiceStrategy - The default worker choice strategy to set.
132 * @param opts - The worker choice strategy options.
134 public setDefaultWorkerChoiceStrategy (
135 workerChoiceStrategy
: WorkerChoiceStrategy
,
136 opts
?: WorkerChoiceStrategyOptions
138 if (workerChoiceStrategy
!== this.defaultWorkerChoiceStrategy
) {
139 this.defaultWorkerChoiceStrategy
= workerChoiceStrategy
140 this.addWorkerChoiceStrategy(workerChoiceStrategy
, this.pool
, opts
)
145 * Updates the worker node key in the active worker choice strategies in the context internals.
147 * @returns `true` if the update is successful, `false` otherwise.
149 public update (workerNodeKey
: number): boolean {
150 const res
: boolean[] = []
151 for (const workerChoiceStrategy
of this.workerChoiceStrategies
.values()) {
152 res
.push(workerChoiceStrategy
.update(workerNodeKey
))
154 return res
.every(r
=> r
)
158 * Executes the worker choice strategy in the context algorithm.
160 * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy
161 * @returns The key of the worker node.
162 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
165 workerChoiceStrategy
: WorkerChoiceStrategy
= this
166 .defaultWorkerChoiceStrategy
168 return this.executeStrategy(
169 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
170 this.workerChoiceStrategies
.get(workerChoiceStrategy
)!
175 * Executes the given worker choice strategy.
177 * @param workerChoiceStrategy - The worker choice strategy.
178 * @returns The key of the worker node.
179 * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
181 private executeStrategy (workerChoiceStrategy
: IWorkerChoiceStrategy
): number {
182 let workerNodeKey
: number | undefined
186 workerNodeKey
= workerChoiceStrategy
.choose()
187 if (workerNodeKey
== null && chooseCount
> 0) {
192 } while (workerNodeKey
== null && retriesCount
< this.retries
)
193 if (workerNodeKey
== null) {
195 `Worker node key chosen is null or undefined after ${retriesCount} retries`
202 * Removes the worker node key from the active worker choice strategies in the context.
204 * @param workerNodeKey - The worker node key.
205 * @returns `true` if the removal is successful, `false` otherwise.
207 public remove (workerNodeKey
: number): boolean {
208 const res
: boolean[] = []
209 for (const workerChoiceStrategy
of this.workerChoiceStrategies
.values()) {
210 res
.push(workerChoiceStrategy
.remove(workerNodeKey
))
212 return res
.every(r
=> r
)
216 * Sets the active worker choice strategies in the context options.
218 * @param opts - The worker choice strategy options.
220 public setOptions (opts
: WorkerChoiceStrategyOptions
| undefined): void {
221 for (const workerChoiceStrategy
of this.workerChoiceStrategies
.values()) {
222 workerChoiceStrategy
.setOptions(opts
)
227 * Synchronizes the active worker choice strategies in the context with the given worker choice strategies.
229 * @param workerChoiceStrategies - The worker choice strategies to synchronize.
230 * @param opts - The worker choice strategy options.
232 public syncWorkerChoiceStrategies (
233 workerChoiceStrategies
: Set
<WorkerChoiceStrategy
>,
234 opts
?: WorkerChoiceStrategyOptions
236 for (const workerChoiceStrategy
of this.workerChoiceStrategies
.keys()) {
237 if (!workerChoiceStrategies
.has(workerChoiceStrategy
)) {
238 this.removeWorkerChoiceStrategy(workerChoiceStrategy
)
241 for (const workerChoiceStrategy
of workerChoiceStrategies
) {
242 if (!this.workerChoiceStrategies
.has(workerChoiceStrategy
)) {
243 this.addWorkerChoiceStrategy(workerChoiceStrategy
, this.pool
, opts
)
249 * Adds a worker choice strategy to the context.
251 * @param workerChoiceStrategy - The worker choice strategy to add.
252 * @param opts - The worker choice strategy options.
253 * @returns The worker choice strategies.
255 private addWorkerChoiceStrategy (
256 workerChoiceStrategy
: WorkerChoiceStrategy
,
257 pool
: IPool
<Worker
, Data
, Response
>,
258 opts
?: WorkerChoiceStrategyOptions
259 ): Map
<WorkerChoiceStrategy
, IWorkerChoiceStrategy
> {
260 if (!this.workerChoiceStrategies
.has(workerChoiceStrategy
)) {
261 return this.workerChoiceStrategies
.set(
262 workerChoiceStrategy
,
263 getWorkerChoiceStrategy
<Worker
, Data
, Response
>(
264 workerChoiceStrategy
,
271 return this.workerChoiceStrategies
275 * Removes a worker choice strategy from the context.
277 * @param workerChoiceStrategy - The worker choice strategy to remove.
278 * @returns `true` if the worker choice strategy is removed, `false` otherwise.
280 private removeWorkerChoiceStrategy (
281 workerChoiceStrategy
: WorkerChoiceStrategy
283 return this.workerChoiceStrategies
.delete(workerChoiceStrategy
)