Commit | Line | Data |
---|---|---|
a35560ba S |
1 | import type { IWorker } from './abstract-pool' |
2 | import type { IPoolInternal } from './pool-internal' | |
3 | ||
4 | /** | |
5 | * Enumeration of worker choice strategies. | |
6 | */ | |
7 | export const WorkerChoiceStrategies = Object.freeze({ | |
8 | /** | |
9 | * Round robin worker selection strategy. | |
10 | */ | |
11 | ROUND_ROBIN: 'ROUND_ROBIN', | |
12 | /** | |
13 | * Less recently used worker selection strategy. | |
14 | */ | |
15 | LESS_RECENTLY_USED: 'LESS_RECENTLY_USED' | |
16 | } as const) | |
17 | ||
18 | /** | |
19 | * Worker choice strategy. | |
20 | */ | |
21 | export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies | |
22 | ||
23 | /** | |
24 | * Worker choice strategy interface. | |
25 | * | |
26 | * @template Worker Type of worker which manages the strategy. | |
27 | */ | |
28 | interface IWorkerChoiceStrategy<Worker extends IWorker> { | |
29 | /** | |
30 | * Choose a worker in the pool. | |
31 | */ | |
32 | choose(): Worker | |
33 | } | |
34 | ||
35 | /** | |
36 | * Selects the next worker in a round robin fashion. | |
37 | * | |
38 | * @template Worker Type of worker which manages the strategy. | |
39 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
40 | * @template Response Type of response of execution. This can only be serializable data. | |
41 | */ | |
42 | class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response> | |
43 | implements IWorkerChoiceStrategy<Worker> { | |
44 | /** | |
45 | * Index for the next worker. | |
46 | */ | |
47 | private nextWorkerIndex: number = 0 | |
48 | ||
49 | /** | |
50 | * Constructs a worker choice strategy that selects in a round robin fashion. | |
51 | * | |
52 | * @param pool The pool instance. | |
53 | */ | |
54 | public constructor ( | |
55 | private readonly pool: IPoolInternal<Worker, Data, Response> | |
56 | ) {} | |
57 | ||
58 | /** @inheritdoc */ | |
59 | public choose (): Worker { | |
60 | const chosenWorker = this.pool.workers[this.nextWorkerIndex] | |
61 | this.nextWorkerIndex = | |
62 | this.pool.workers.length - 1 === this.nextWorkerIndex | |
63 | ? 0 | |
64 | : this.nextWorkerIndex + 1 | |
65 | return chosenWorker | |
66 | } | |
67 | } | |
68 | ||
69 | /** | |
70 | * Selects the less recently used worker. | |
71 | * | |
72 | * @template Worker Type of worker which manages the strategy. | |
73 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
74 | * @template Response Type of response of execution. This can only be serializable data. | |
75 | */ | |
76 | class LessRecentlyUsedWorkerChoiceStrategy< | |
77 | Worker extends IWorker, | |
78 | Data, | |
79 | Response | |
80 | > implements IWorkerChoiceStrategy<Worker> { | |
81 | /** | |
82 | * Constructs a worker choice strategy that selects based on less recently used. | |
83 | * | |
84 | * @param pool The pool instance. | |
85 | */ | |
86 | public constructor ( | |
87 | private readonly pool: IPoolInternal<Worker, Data, Response> | |
88 | ) {} | |
89 | ||
90 | /** @inheritdoc */ | |
91 | public choose (): Worker { | |
ff5e76e1 | 92 | const isPoolDynamic = this.pool.dynamic |
a35560ba S |
93 | let minNumberOfTasks = Infinity |
94 | // A worker is always found because it picks the one with fewer tasks | |
95 | let lessRecentlyUsedWorker!: Worker | |
96 | for (const [worker, numberOfTasks] of this.pool.tasks) { | |
ff5e76e1 | 97 | if (!isPoolDynamic && numberOfTasks === 0) { |
a35560ba S |
98 | return worker |
99 | } else if (numberOfTasks < minNumberOfTasks) { | |
100 | minNumberOfTasks = numberOfTasks | |
101 | lessRecentlyUsedWorker = worker | |
102 | } | |
103 | } | |
104 | return lessRecentlyUsedWorker | |
105 | } | |
106 | } | |
107 | ||
a35560ba S |
108 | /** |
109 | * Dynamically choose a worker. | |
110 | * | |
111 | * @template Worker Type of worker which manages the strategy. | |
112 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
113 | * @template Response Type of response of execution. This can only be serializable data. | |
114 | */ | |
115 | class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response> | |
116 | implements IWorkerChoiceStrategy<Worker> { | |
117 | private workerChoiceStrategy: IWorkerChoiceStrategy<Worker> | |
118 | ||
119 | /** | |
120 | * Constructs a worker choice strategy for dynamical pools. | |
121 | * | |
122 | * @param pool The pool instance. | |
4a6952ff | 123 | * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. |
a35560ba S |
124 | * @param workerChoiceStrategy The worker choice strategy when the pull is full. |
125 | */ | |
126 | public constructor ( | |
127 | private readonly pool: IPoolInternal<Worker, Data, Response>, | |
4a6952ff | 128 | private createDynamicallyWorkerCallback: () => Worker, |
a35560ba S |
129 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN |
130 | ) { | |
ff5e76e1 | 131 | this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy( |
a35560ba S |
132 | this.pool, |
133 | workerChoiceStrategy | |
134 | ) | |
135 | } | |
136 | ||
a35560ba S |
137 | /** @inheritdoc */ |
138 | public choose (): Worker { | |
ff5e76e1 | 139 | const freeWorker = SelectionStrategiesUtils.findFreeWorkerBasedOnTasks( |
4a6952ff | 140 | this.pool.tasks |
ff5e76e1 | 141 | ) |
a35560ba S |
142 | if (freeWorker) { |
143 | return freeWorker | |
144 | } | |
145 | ||
146 | if (this.pool.workers.length === this.pool.max) { | |
330c983e | 147 | this.pool.emitter.emit('busy') |
a35560ba S |
148 | return this.workerChoiceStrategy.choose() |
149 | } | |
150 | ||
151 | // All workers are busy, create a new worker | |
4a6952ff | 152 | return this.createDynamicallyWorkerCallback() |
a35560ba S |
153 | } |
154 | } | |
155 | ||
156 | /** | |
157 | * The worker choice strategy context. | |
158 | * | |
159 | * @template Worker Type of worker. | |
160 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
161 | * @template Response Type of response of execution. This can only be serializable data. | |
162 | */ | |
163 | export class WorkerChoiceStrategyContext< | |
164 | Worker extends IWorker, | |
165 | Data, | |
166 | Response | |
167 | > { | |
168 | // Will be set by setter in constructor | |
169 | private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker> | |
170 | ||
171 | /** | |
172 | * Worker choice strategy context constructor. | |
173 | * | |
174 | * @param pool The pool instance. | |
4a6952ff | 175 | * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. |
a35560ba S |
176 | * @param workerChoiceStrategy The worker choice strategy. |
177 | */ | |
178 | public constructor ( | |
179 | private readonly pool: IPoolInternal<Worker, Data, Response>, | |
4a6952ff | 180 | private createDynamicallyWorkerCallback: () => Worker, |
a35560ba S |
181 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN |
182 | ) { | |
183 | this.setWorkerChoiceStrategy(workerChoiceStrategy) | |
184 | } | |
185 | ||
186 | /** | |
187 | * Get the worker choice strategy instance specific to the pool type. | |
188 | * | |
189 | * @param workerChoiceStrategy The worker choice strategy. | |
190 | * @returns The worker choice strategy instance for the pool type. | |
191 | */ | |
192 | private getPoolWorkerChoiceStrategy ( | |
193 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN | |
194 | ): IWorkerChoiceStrategy<Worker> { | |
ff5e76e1 | 195 | if (this.pool.dynamic) { |
a35560ba S |
196 | return new DynamicPoolWorkerChoiceStrategy( |
197 | this.pool, | |
4a6952ff | 198 | this.createDynamicallyWorkerCallback, |
a35560ba S |
199 | workerChoiceStrategy |
200 | ) | |
201 | } | |
ff5e76e1 JB |
202 | return SelectionStrategiesUtils.getWorkerChoiceStrategy( |
203 | this.pool, | |
204 | workerChoiceStrategy | |
205 | ) | |
a35560ba S |
206 | } |
207 | ||
208 | /** | |
209 | * Set the worker choice strategy to use in the context. | |
210 | * | |
211 | * @param workerChoiceStrategy The worker choice strategy to set. | |
212 | */ | |
213 | public setWorkerChoiceStrategy ( | |
214 | workerChoiceStrategy: WorkerChoiceStrategy | |
215 | ): void { | |
216 | this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( | |
217 | workerChoiceStrategy | |
218 | ) | |
219 | } | |
220 | ||
221 | /** | |
222 | * Choose a worker with the underlying selection strategy. | |
223 | * | |
224 | * @returns The chosen one. | |
225 | */ | |
226 | public execute (): Worker { | |
227 | return this.workerChoiceStrategy.choose() | |
228 | } | |
229 | } | |
ff5e76e1 JB |
230 | |
231 | /** | |
232 | * Worker selection strategies helpers. | |
233 | */ | |
234 | class SelectionStrategiesUtils { | |
235 | /** | |
236 | * Find a free worker based on number of tasks the worker has applied. | |
237 | * | |
238 | * If a worker was found that has `0` tasks, it is detected as free and will be returned. | |
239 | * | |
240 | * If no free worker was found, `null` will be returned. | |
241 | * | |
4a6952ff | 242 | * @param workerTasksMap The pool worker tasks map. |
ff5e76e1 JB |
243 | * @returns A free worker if there was one, otherwise `null`. |
244 | */ | |
4a6952ff JB |
245 | public static findFreeWorkerBasedOnTasks<Worker extends IWorker> ( |
246 | workerTasksMap: Map<Worker, number> | |
247 | ): Worker | null { | |
248 | for (const [worker, numberOfTasks] of workerTasksMap) { | |
ff5e76e1 JB |
249 | if (numberOfTasks === 0) { |
250 | // A worker is free, use it | |
251 | return worker | |
252 | } | |
253 | } | |
254 | return null | |
255 | } | |
256 | ||
257 | /** | |
258 | * Get the worker choice strategy instance. | |
259 | * | |
260 | * @param pool The pool instance. | |
261 | * @param workerChoiceStrategy The worker choice strategy. | |
262 | * @returns The worker choice strategy instance. | |
263 | */ | |
264 | public static getWorkerChoiceStrategy< | |
265 | Worker extends IWorker, | |
266 | Data, | |
267 | Response | |
268 | > ( | |
269 | pool: IPoolInternal<Worker, Data, Response>, | |
270 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN | |
271 | ): IWorkerChoiceStrategy<Worker> { | |
272 | switch (workerChoiceStrategy) { | |
273 | case WorkerChoiceStrategies.ROUND_ROBIN: | |
274 | return new RoundRobinWorkerChoiceStrategy(pool) | |
275 | case WorkerChoiceStrategies.LESS_RECENTLY_USED: | |
276 | return new LessRecentlyUsedWorkerChoiceStrategy(pool) | |
277 | default: | |
278 | throw new Error( | |
279 | `Worker choice strategy '${workerChoiceStrategy}' not found` | |
280 | ) | |
281 | } | |
282 | } | |
283 | } |