Commit | Line | Data |
---|---|---|
a35560ba S |
1 | import { isKillBehavior, KillBehaviors } from '../worker/worker-options' |
2 | import type { IWorker } from './abstract-pool' | |
3 | import type { IPoolInternal } from './pool-internal' | |
4 | ||
5 | /** | |
6 | * Enumeration of worker choice strategies. | |
7 | */ | |
8 | export const WorkerChoiceStrategies = Object.freeze({ | |
9 | /** | |
10 | * Round robin worker selection strategy. | |
11 | */ | |
12 | ROUND_ROBIN: 'ROUND_ROBIN', | |
13 | /** | |
14 | * Less recently used worker selection strategy. | |
15 | */ | |
16 | LESS_RECENTLY_USED: 'LESS_RECENTLY_USED' | |
17 | } as const) | |
18 | ||
19 | /** | |
20 | * Worker choice strategy. | |
21 | */ | |
22 | export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies | |
23 | ||
24 | /** | |
25 | * Worker choice strategy interface. | |
26 | * | |
27 | * @template Worker Type of worker which manages the strategy. | |
28 | */ | |
29 | interface IWorkerChoiceStrategy<Worker extends IWorker> { | |
30 | /** | |
31 | * Choose a worker in the pool. | |
32 | */ | |
33 | choose(): Worker | |
34 | } | |
35 | ||
36 | /** | |
37 | * Selects the next worker in a round robin fashion. | |
38 | * | |
39 | * @template Worker Type of worker which manages the strategy. | |
40 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
41 | * @template Response Type of response of execution. This can only be serializable data. | |
42 | */ | |
43 | class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response> | |
44 | implements IWorkerChoiceStrategy<Worker> { | |
45 | /** | |
46 | * Index for the next worker. | |
47 | */ | |
48 | private nextWorkerIndex: number = 0 | |
49 | ||
50 | /** | |
51 | * Constructs a worker choice strategy that selects in a round robin fashion. | |
52 | * | |
53 | * @param pool The pool instance. | |
54 | */ | |
55 | public constructor ( | |
56 | private readonly pool: IPoolInternal<Worker, Data, Response> | |
57 | ) {} | |
58 | ||
59 | /** @inheritdoc */ | |
60 | public choose (): Worker { | |
61 | const chosenWorker = this.pool.workers[this.nextWorkerIndex] | |
62 | this.nextWorkerIndex = | |
63 | this.pool.workers.length - 1 === this.nextWorkerIndex | |
64 | ? 0 | |
65 | : this.nextWorkerIndex + 1 | |
66 | return chosenWorker | |
67 | } | |
68 | } | |
69 | ||
70 | /** | |
71 | * Selects the less recently used worker. | |
72 | * | |
73 | * @template Worker Type of worker which manages the strategy. | |
74 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
75 | * @template Response Type of response of execution. This can only be serializable data. | |
76 | */ | |
77 | class LessRecentlyUsedWorkerChoiceStrategy< | |
78 | Worker extends IWorker, | |
79 | Data, | |
80 | Response | |
81 | > implements IWorkerChoiceStrategy<Worker> { | |
82 | /** | |
83 | * Constructs a worker choice strategy that selects based on less recently used. | |
84 | * | |
85 | * @param pool The pool instance. | |
86 | */ | |
87 | public constructor ( | |
88 | private readonly pool: IPoolInternal<Worker, Data, Response> | |
89 | ) {} | |
90 | ||
91 | /** @inheritdoc */ | |
92 | public choose (): Worker { | |
93 | let minNumberOfTasks = Infinity | |
94 | // A worker is always found because it picks the one with fewer tasks | |
95 | let lessRecentlyUsedWorker!: Worker | |
96 | for (const [worker, numberOfTasks] of this.pool.tasks) { | |
97 | if (numberOfTasks === 0) { | |
98 | return worker | |
99 | } else if (numberOfTasks < minNumberOfTasks) { | |
100 | minNumberOfTasks = numberOfTasks | |
101 | lessRecentlyUsedWorker = worker | |
102 | } | |
103 | } | |
104 | return lessRecentlyUsedWorker | |
105 | } | |
106 | } | |
107 | ||
108 | /** | |
109 | * Get the worker choice strategy instance. | |
110 | * | |
111 | * @param pool The pool instance. | |
112 | * @param workerChoiceStrategy The worker choice strategy. | |
113 | * @returns The worker choice strategy instance. | |
114 | */ | |
115 | function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> ( | |
116 | pool: IPoolInternal<Worker, Data, Response>, | |
117 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN | |
118 | ): IWorkerChoiceStrategy<Worker> { | |
119 | switch (workerChoiceStrategy) { | |
120 | case WorkerChoiceStrategies.ROUND_ROBIN: | |
121 | return new RoundRobinWorkerChoiceStrategy(pool) | |
122 | case WorkerChoiceStrategies.LESS_RECENTLY_USED: | |
123 | return new LessRecentlyUsedWorkerChoiceStrategy(pool) | |
124 | default: | |
125 | throw new Error( | |
126 | `Worker choice strategy '${workerChoiceStrategy}' not found` | |
127 | ) | |
128 | } | |
129 | } | |
130 | ||
131 | /** | |
132 | * Dynamically choose a worker. | |
133 | * | |
134 | * @template Worker Type of worker which manages the strategy. | |
135 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
136 | * @template Response Type of response of execution. This can only be serializable data. | |
137 | */ | |
138 | class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response> | |
139 | implements IWorkerChoiceStrategy<Worker> { | |
140 | private workerChoiceStrategy: IWorkerChoiceStrategy<Worker> | |
141 | ||
142 | /** | |
143 | * Constructs a worker choice strategy for dynamical pools. | |
144 | * | |
145 | * @param pool The pool instance. | |
146 | * @param workerChoiceStrategy The worker choice strategy when the pull is full. | |
147 | */ | |
148 | public constructor ( | |
149 | private readonly pool: IPoolInternal<Worker, Data, Response>, | |
150 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN | |
151 | ) { | |
152 | this.workerChoiceStrategy = getWorkerChoiceStrategy( | |
153 | this.pool, | |
154 | workerChoiceStrategy | |
155 | ) | |
156 | } | |
157 | ||
158 | /** | |
159 | * Find a free worker based on number of tasks the worker has applied. | |
160 | * | |
161 | * If a worker was found that has `0` tasks, it is detected as free and will be returned. | |
162 | * | |
163 | * If no free worker was found, `null` will be returned. | |
164 | * | |
165 | * @returns A free worker if there was one, otherwise `null`. | |
166 | */ | |
167 | private findFreeWorkerBasedOnTasks (): Worker | null { | |
168 | for (const [worker, numberOfTasks] of this.pool.tasks) { | |
169 | if (numberOfTasks === 0) { | |
170 | // A worker is free, use it | |
171 | return worker | |
172 | } | |
173 | } | |
174 | return null | |
175 | } | |
176 | ||
177 | /** @inheritdoc */ | |
178 | public choose (): Worker { | |
179 | const freeWorker = this.findFreeWorkerBasedOnTasks() | |
180 | if (freeWorker) { | |
181 | return freeWorker | |
182 | } | |
183 | ||
184 | if (this.pool.workers.length === this.pool.max) { | |
185 | this.pool.emitter.emit('FullPool') | |
186 | return this.workerChoiceStrategy.choose() | |
187 | } | |
188 | ||
189 | // All workers are busy, create a new worker | |
190 | const workerCreated = this.pool.createAndSetupWorker() | |
191 | this.pool.registerWorkerMessageListener(workerCreated, message => { | |
192 | const tasksInProgress = this.pool.tasks.get(workerCreated) | |
193 | if ( | |
194 | isKillBehavior(KillBehaviors.HARD, message.kill) || | |
195 | tasksInProgress === 0 | |
196 | ) { | |
197 | // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) | |
198 | void this.pool.destroyWorker(workerCreated) | |
199 | } | |
200 | }) | |
201 | return workerCreated | |
202 | } | |
203 | } | |
204 | ||
205 | /** | |
206 | * The worker choice strategy context. | |
207 | * | |
208 | * @template Worker Type of worker. | |
209 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
210 | * @template Response Type of response of execution. This can only be serializable data. | |
211 | */ | |
212 | export class WorkerChoiceStrategyContext< | |
213 | Worker extends IWorker, | |
214 | Data, | |
215 | Response | |
216 | > { | |
217 | // Will be set by setter in constructor | |
218 | private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker> | |
219 | ||
220 | /** | |
221 | * Worker choice strategy context constructor. | |
222 | * | |
223 | * @param pool The pool instance. | |
224 | * @param workerChoiceStrategy The worker choice strategy. | |
225 | */ | |
226 | public constructor ( | |
227 | private readonly pool: IPoolInternal<Worker, Data, Response>, | |
228 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN | |
229 | ) { | |
230 | this.setWorkerChoiceStrategy(workerChoiceStrategy) | |
231 | } | |
232 | ||
233 | /** | |
234 | * Get the worker choice strategy instance specific to the pool type. | |
235 | * | |
236 | * @param workerChoiceStrategy The worker choice strategy. | |
237 | * @returns The worker choice strategy instance for the pool type. | |
238 | */ | |
239 | private getPoolWorkerChoiceStrategy ( | |
240 | workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN | |
241 | ): IWorkerChoiceStrategy<Worker> { | |
242 | if (this.pool.isDynamic()) { | |
243 | return new DynamicPoolWorkerChoiceStrategy( | |
244 | this.pool, | |
245 | workerChoiceStrategy | |
246 | ) | |
247 | } | |
248 | return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy) | |
249 | } | |
250 | ||
251 | /** | |
252 | * Set the worker choice strategy to use in the context. | |
253 | * | |
254 | * @param workerChoiceStrategy The worker choice strategy to set. | |
255 | */ | |
256 | public setWorkerChoiceStrategy ( | |
257 | workerChoiceStrategy: WorkerChoiceStrategy | |
258 | ): void { | |
259 | this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( | |
260 | workerChoiceStrategy | |
261 | ) | |
262 | } | |
263 | ||
264 | /** | |
265 | * Choose a worker with the underlying selection strategy. | |
266 | * | |
267 | * @returns The chosen one. | |
268 | */ | |
269 | public execute (): Worker { | |
270 | return this.workerChoiceStrategy.choose() | |
271 | } | |
272 | } |