Extract selection strategies to classes (#176)
[poolifier.git] / src / pools / selection-strategies.ts
1 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
2 import type { IWorker } from './abstract-pool'
3 import type { IPoolInternal } from './pool-internal'
4
5 /**
6 * Enumeration of worker choice strategies.
7 */
8 export const WorkerChoiceStrategies = Object.freeze({
9 /**
10 * Round robin worker selection strategy.
11 */
12 ROUND_ROBIN: 'ROUND_ROBIN',
13 /**
14 * Less recently used worker selection strategy.
15 */
16 LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
17 } as const)
18
19 /**
20 * Worker choice strategy.
21 */
22 export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
23
24 /**
25 * Worker choice strategy interface.
26 *
27 * @template Worker Type of worker which manages the strategy.
28 */
29 interface IWorkerChoiceStrategy<Worker extends IWorker> {
30 /**
31 * Choose a worker in the pool.
32 */
33 choose(): Worker
34 }
35
36 /**
37 * Selects the next worker in a round robin fashion.
38 *
39 * @template Worker Type of worker which manages the strategy.
40 * @template Data Type of data sent to the worker. This can only be serializable data.
41 * @template Response Type of response of execution. This can only be serializable data.
42 */
43 class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
44 implements IWorkerChoiceStrategy<Worker> {
45 /**
46 * Index for the next worker.
47 */
48 private nextWorkerIndex: number = 0
49
50 /**
51 * Constructs a worker choice strategy that selects in a round robin fashion.
52 *
53 * @param pool The pool instance.
54 */
55 public constructor (
56 private readonly pool: IPoolInternal<Worker, Data, Response>
57 ) {}
58
59 /** @inheritdoc */
60 public choose (): Worker {
61 const chosenWorker = this.pool.workers[this.nextWorkerIndex]
62 this.nextWorkerIndex =
63 this.pool.workers.length - 1 === this.nextWorkerIndex
64 ? 0
65 : this.nextWorkerIndex + 1
66 return chosenWorker
67 }
68 }
69
70 /**
71 * Selects the less recently used worker.
72 *
73 * @template Worker Type of worker which manages the strategy.
74 * @template Data Type of data sent to the worker. This can only be serializable data.
75 * @template Response Type of response of execution. This can only be serializable data.
76 */
77 class LessRecentlyUsedWorkerChoiceStrategy<
78 Worker extends IWorker,
79 Data,
80 Response
81 > implements IWorkerChoiceStrategy<Worker> {
82 /**
83 * Constructs a worker choice strategy that selects based on less recently used.
84 *
85 * @param pool The pool instance.
86 */
87 public constructor (
88 private readonly pool: IPoolInternal<Worker, Data, Response>
89 ) {}
90
91 /** @inheritdoc */
92 public choose (): Worker {
93 let minNumberOfTasks = Infinity
94 // A worker is always found because it picks the one with fewer tasks
95 let lessRecentlyUsedWorker!: Worker
96 for (const [worker, numberOfTasks] of this.pool.tasks) {
97 if (numberOfTasks === 0) {
98 return worker
99 } else if (numberOfTasks < minNumberOfTasks) {
100 minNumberOfTasks = numberOfTasks
101 lessRecentlyUsedWorker = worker
102 }
103 }
104 return lessRecentlyUsedWorker
105 }
106 }
107
108 /**
109 * Get the worker choice strategy instance.
110 *
111 * @param pool The pool instance.
112 * @param workerChoiceStrategy The worker choice strategy.
113 * @returns The worker choice strategy instance.
114 */
115 function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> (
116 pool: IPoolInternal<Worker, Data, Response>,
117 workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
118 ): IWorkerChoiceStrategy<Worker> {
119 switch (workerChoiceStrategy) {
120 case WorkerChoiceStrategies.ROUND_ROBIN:
121 return new RoundRobinWorkerChoiceStrategy(pool)
122 case WorkerChoiceStrategies.LESS_RECENTLY_USED:
123 return new LessRecentlyUsedWorkerChoiceStrategy(pool)
124 default:
125 throw new Error(
126 `Worker choice strategy '${workerChoiceStrategy}' not found`
127 )
128 }
129 }
130
131 /**
132 * Dynamically choose a worker.
133 *
134 * @template Worker Type of worker which manages the strategy.
135 * @template Data Type of data sent to the worker. This can only be serializable data.
136 * @template Response Type of response of execution. This can only be serializable data.
137 */
138 class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
139 implements IWorkerChoiceStrategy<Worker> {
140 private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
141
142 /**
143 * Constructs a worker choice strategy for dynamical pools.
144 *
145 * @param pool The pool instance.
146 * @param workerChoiceStrategy The worker choice strategy when the pull is full.
147 */
148 public constructor (
149 private readonly pool: IPoolInternal<Worker, Data, Response>,
150 workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
151 ) {
152 this.workerChoiceStrategy = getWorkerChoiceStrategy(
153 this.pool,
154 workerChoiceStrategy
155 )
156 }
157
158 /**
159 * Find a free worker based on number of tasks the worker has applied.
160 *
161 * If a worker was found that has `0` tasks, it is detected as free and will be returned.
162 *
163 * If no free worker was found, `null` will be returned.
164 *
165 * @returns A free worker if there was one, otherwise `null`.
166 */
167 private findFreeWorkerBasedOnTasks (): Worker | null {
168 for (const [worker, numberOfTasks] of this.pool.tasks) {
169 if (numberOfTasks === 0) {
170 // A worker is free, use it
171 return worker
172 }
173 }
174 return null
175 }
176
177 /** @inheritdoc */
178 public choose (): Worker {
179 const freeWorker = this.findFreeWorkerBasedOnTasks()
180 if (freeWorker) {
181 return freeWorker
182 }
183
184 if (this.pool.workers.length === this.pool.max) {
185 this.pool.emitter.emit('FullPool')
186 return this.workerChoiceStrategy.choose()
187 }
188
189 // All workers are busy, create a new worker
190 const workerCreated = this.pool.createAndSetupWorker()
191 this.pool.registerWorkerMessageListener(workerCreated, message => {
192 const tasksInProgress = this.pool.tasks.get(workerCreated)
193 if (
194 isKillBehavior(KillBehaviors.HARD, message.kill) ||
195 tasksInProgress === 0
196 ) {
197 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
198 void this.pool.destroyWorker(workerCreated)
199 }
200 })
201 return workerCreated
202 }
203 }
204
205 /**
206 * The worker choice strategy context.
207 *
208 * @template Worker Type of worker.
209 * @template Data Type of data sent to the worker. This can only be serializable data.
210 * @template Response Type of response of execution. This can only be serializable data.
211 */
212 export class WorkerChoiceStrategyContext<
213 Worker extends IWorker,
214 Data,
215 Response
216 > {
217 // Will be set by setter in constructor
218 private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
219
220 /**
221 * Worker choice strategy context constructor.
222 *
223 * @param pool The pool instance.
224 * @param workerChoiceStrategy The worker choice strategy.
225 */
226 public constructor (
227 private readonly pool: IPoolInternal<Worker, Data, Response>,
228 workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
229 ) {
230 this.setWorkerChoiceStrategy(workerChoiceStrategy)
231 }
232
233 /**
234 * Get the worker choice strategy instance specific to the pool type.
235 *
236 * @param workerChoiceStrategy The worker choice strategy.
237 * @returns The worker choice strategy instance for the pool type.
238 */
239 private getPoolWorkerChoiceStrategy (
240 workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
241 ): IWorkerChoiceStrategy<Worker> {
242 if (this.pool.isDynamic()) {
243 return new DynamicPoolWorkerChoiceStrategy(
244 this.pool,
245 workerChoiceStrategy
246 )
247 }
248 return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
249 }
250
251 /**
252 * Set the worker choice strategy to use in the context.
253 *
254 * @param workerChoiceStrategy The worker choice strategy to set.
255 */
256 public setWorkerChoiceStrategy (
257 workerChoiceStrategy: WorkerChoiceStrategy
258 ): void {
259 this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
260 workerChoiceStrategy
261 )
262 }
263
264 /**
265 * Choose a worker with the underlying selection strategy.
266 *
267 * @returns The chosen one.
268 */
269 public execute (): Worker {
270 return this.workerChoiceStrategy.choose()
271 }
272 }