Fix strategy handling in pool options (#259)
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
6 import type { IPoolInternal } from './pool-internal'
7 import { PoolEmitter, PoolType } from './pool-internal'
8 import type { WorkerChoiceStrategy } from './selection-strategies'
9 import {
10 WorkerChoiceStrategies,
11 WorkerChoiceStrategyContext
12 } from './selection-strategies'
13
14 /**
15 * An intentional empty function.
16 */
17 const EMPTY_FUNCTION: () => void = () => {
18 /* Intentionally empty */
19 }
20
21 /**
22 * Callback invoked if the worker raised an error.
23 */
24 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
25
26 /**
27 * Callback invoked when the worker has started successfully.
28 */
29 export type OnlineHandler<Worker> = (this: Worker) => void
30
31 /**
32 * Callback invoked when the worker exits successfully.
33 */
34 export type ExitHandler<Worker> = (this: Worker, code: number) => void
35
36 /**
37 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
38 */
39 export interface IWorker {
40 /**
41 * Register a listener to the error event.
42 *
43 * @param event `'error'`.
44 * @param handler The error handler.
45 */
46 on(event: 'error', handler: ErrorHandler<this>): void
47 /**
48 * Register a listener to the online event.
49 *
50 * @param event `'online'`.
51 * @param handler The online handler.
52 */
53 on(event: 'online', handler: OnlineHandler<this>): void
54 /**
55 * Register a listener to the exit event.
56 *
57 * @param event `'exit'`.
58 * @param handler The exit handler.
59 */
60 on(event: 'exit', handler: ExitHandler<this>): void
61 /**
62 * Register a listener to the exit event that will only performed once.
63 *
64 * @param event `'exit'`.
65 * @param handler The exit handler.
66 */
67 once(event: 'exit', handler: ExitHandler<this>): void
68 }
69
70 /**
71 * Options for a poolifier pool.
72 */
73 export interface PoolOptions<Worker> {
74 /**
75 * A function that will listen for error event on each worker.
76 */
77 errorHandler?: ErrorHandler<Worker>
78 /**
79 * A function that will listen for online event on each worker.
80 */
81 onlineHandler?: OnlineHandler<Worker>
82 /**
83 * A function that will listen for exit event on each worker.
84 */
85 exitHandler?: ExitHandler<Worker>
86 /**
87 * The work choice strategy to use in this pool.
88 */
89 workerChoiceStrategy?: WorkerChoiceStrategy
90 /**
91 * Pool events emission.
92 *
93 * Default to true.
94 */
95 enableEvents?: boolean
96 }
97
98 /**
99 * Base class containing some shared logic for all poolifier pools.
100 *
101 * @template Worker Type of worker which manages this pool.
102 * @template Data Type of data sent to the worker. This can only be serializable data.
103 * @template Response Type of response of execution. This can only be serializable data.
104 */
105 export abstract class AbstractPool<
106 Worker extends IWorker,
107 Data = unknown,
108 Response = unknown
109 > implements IPoolInternal<Worker, Data, Response> {
110 /** @inheritdoc */
111 public readonly workers: Worker[] = []
112
113 /** @inheritdoc */
114 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
115
116 /** @inheritdoc */
117 public readonly emitter?: PoolEmitter
118
119 /** @inheritdoc */
120 public readonly max?: number
121
122 /**
123 * The promise map.
124 *
125 * - `key`: This is the message ID of each submitted task.
126 * - `value`: An object that contains the worker, the resolve function and the reject function.
127 *
128 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
129 */
130 protected promiseMap: Map<
131 number,
132 PromiseWorkerResponseWrapper<Worker, Response>
133 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
134
135 /**
136 * ID of the next message.
137 */
138 protected nextMessageId: number = 0
139
140 /**
141 * Worker choice strategy instance implementing the worker choice algorithm.
142 *
143 * Default to a strategy implementing a round robin algorithm.
144 */
145 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
146 Worker,
147 Data,
148 Response
149 >
150
151 /**
152 * Constructs a new poolifier pool.
153 *
154 * @param numberOfWorkers Number of workers that this pool should manage.
155 * @param filePath Path to the worker-file.
156 * @param opts Options for the pool.
157 */
158 public constructor (
159 public readonly numberOfWorkers: number,
160 public readonly filePath: string,
161 public readonly opts: PoolOptions<Worker>
162 ) {
163 if (!this.isMain()) {
164 throw new Error('Cannot start a pool from a worker!')
165 }
166 this.checkNumberOfWorkers(this.numberOfWorkers)
167 this.checkFilePath(this.filePath)
168 this.checkPoolOptions(this.opts)
169 this.setupHook()
170
171 for (let i = 1; i <= this.numberOfWorkers; i++) {
172 this.createAndSetupWorker()
173 }
174
175 if (this.opts.enableEvents) {
176 this.emitter = new PoolEmitter()
177 }
178 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
179 this,
180 () => {
181 const workerCreated = this.createAndSetupWorker()
182 this.registerWorkerMessageListener(workerCreated, message => {
183 const tasksInProgress = this.tasks.get(workerCreated)
184 if (
185 isKillBehavior(KillBehaviors.HARD, message.kill) ||
186 tasksInProgress === 0
187 ) {
188 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
189 void this.destroyWorker(workerCreated)
190 }
191 })
192 return workerCreated
193 },
194 this.opts.workerChoiceStrategy
195 )
196 }
197
198 private checkFilePath (filePath: string): void {
199 if (!filePath) {
200 throw new Error('Please specify a file with a worker implementation')
201 }
202 }
203
204 private checkNumberOfWorkers (numberOfWorkers: number): void {
205 if (numberOfWorkers == null) {
206 throw new Error(
207 'Cannot instantiate a pool without specifying the number of workers'
208 )
209 } else if (!Number.isSafeInteger(numberOfWorkers)) {
210 throw new Error(
211 'Cannot instantiate a pool with a non integer number of workers'
212 )
213 } else if (numberOfWorkers < 0) {
214 throw new Error(
215 'Cannot instantiate a pool with a negative number of workers'
216 )
217 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
218 throw new Error('Cannot instantiate a fixed pool with no worker')
219 }
220 }
221
222 private checkPoolOptions (opts: PoolOptions<Worker>): void {
223 this.opts.workerChoiceStrategy =
224 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
225 this.opts.enableEvents = opts.enableEvents ?? true
226 }
227
228 /** @inheritdoc */
229 public abstract get type (): PoolType
230
231 /** @inheritdoc */
232 public get numberOfRunningTasks (): number {
233 return this.promiseMap.size
234 }
235
236 /** @inheritdoc */
237 public setWorkerChoiceStrategy (
238 workerChoiceStrategy: WorkerChoiceStrategy
239 ): void {
240 this.opts.workerChoiceStrategy = workerChoiceStrategy
241 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
242 workerChoiceStrategy
243 )
244 }
245
246 /** @inheritdoc */
247 public abstract get busy (): boolean
248
249 protected internalGetBusyStatus (): boolean {
250 return (
251 this.numberOfRunningTasks >= this.numberOfWorkers &&
252 this.findFreeTasksMapEntry() === false
253 )
254 }
255
256 /** @inheritdoc */
257 public findFreeTasksMapEntry (): [Worker, number] | false {
258 for (const [worker, numberOfTasks] of this.tasks) {
259 if (numberOfTasks === 0) {
260 // A worker is free, return the matching tasks map entry
261 return [worker, numberOfTasks]
262 }
263 }
264 return false
265 }
266
267 /** @inheritdoc */
268 public execute (data: Data): Promise<Response> {
269 // Configure worker to handle message with the specified task
270 const worker = this.chooseWorker()
271 this.increaseWorkersTask(worker)
272 this.checkAndEmitBusy()
273 const messageId = ++this.nextMessageId
274 const res = this.internalExecute(worker, messageId)
275 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
276 return res
277 }
278
279 /** @inheritdoc */
280 public async destroy (): Promise<void> {
281 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
282 }
283
284 /**
285 * Shut down given worker.
286 *
287 * @param worker A worker within `workers`.
288 */
289 protected abstract destroyWorker (worker: Worker): void | Promise<void>
290
291 /**
292 * Setup hook that can be overridden by a Poolifier pool implementation
293 * to run code before workers are created in the abstract constructor.
294 */
295 protected setupHook (): void {
296 // Can be overridden
297 }
298
299 /**
300 * Should return whether the worker is the main worker or not.
301 */
302 protected abstract isMain (): boolean
303
304 /**
305 * Increase the number of tasks that the given workers has applied.
306 *
307 * @param worker Worker whose tasks are increased.
308 */
309 protected increaseWorkersTask (worker: Worker): void {
310 this.stepWorkerNumberOfTasks(worker, 1)
311 }
312
313 /**
314 * Decrease the number of tasks that the given workers has applied.
315 *
316 * @param worker Worker whose tasks are decreased.
317 */
318 protected decreaseWorkersTasks (worker: Worker): void {
319 this.stepWorkerNumberOfTasks(worker, -1)
320 }
321
322 /**
323 * Step the number of tasks that the given workers has applied.
324 *
325 * @param worker Worker whose tasks are set.
326 * @param step Worker number of tasks step.
327 */
328 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
329 const numberOfTasksInProgress = this.tasks.get(worker)
330 if (numberOfTasksInProgress !== undefined) {
331 this.tasks.set(worker, numberOfTasksInProgress + step)
332 } else {
333 throw Error('Worker could not be found in tasks map')
334 }
335 }
336
337 /**
338 * Removes the given worker from the pool.
339 *
340 * @param worker Worker that will be removed.
341 */
342 protected removeWorker (worker: Worker): void {
343 // Clean worker from data structure
344 const workerIndex = this.workers.indexOf(worker)
345 this.workers.splice(workerIndex, 1)
346 this.tasks.delete(worker)
347 }
348
349 /**
350 * Choose a worker for the next task.
351 *
352 * The default implementation uses a round robin algorithm to distribute the load.
353 *
354 * @returns Worker.
355 */
356 protected chooseWorker (): Worker {
357 return this.workerChoiceStrategyContext.execute()
358 }
359
360 /**
361 * Send a message to the given worker.
362 *
363 * @param worker The worker which should receive the message.
364 * @param message The message.
365 */
366 protected abstract sendToWorker (
367 worker: Worker,
368 message: MessageValue<Data>
369 ): void
370
371 /**
372 * Register a listener callback on a given worker.
373 *
374 * @param worker A worker.
375 * @param listener A message listener callback.
376 */
377 protected abstract registerWorkerMessageListener<
378 Message extends Data | Response
379 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
380
381 protected internalExecute (
382 worker: Worker,
383 messageId: number
384 ): Promise<Response> {
385 return new Promise<Response>((resolve, reject) => {
386 this.promiseMap.set(messageId, { resolve, reject, worker })
387 })
388 }
389
390 /**
391 * Returns a newly created worker.
392 */
393 protected abstract createWorker (): Worker
394
395 /**
396 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
397 *
398 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
399 *
400 * @param worker The newly created worker.
401 */
402 protected abstract afterWorkerSetup (worker: Worker): void
403
404 /**
405 * Creates a new worker for this pool and sets it up completely.
406 *
407 * @returns New, completely set up worker.
408 */
409 protected createAndSetupWorker (): Worker {
410 const worker: Worker = this.createWorker()
411
412 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
413 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
414 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
415 worker.once('exit', () => this.removeWorker(worker))
416
417 this.workers.push(worker)
418
419 // Init tasks map
420 this.tasks.set(worker, 0)
421
422 this.afterWorkerSetup(worker)
423
424 return worker
425 }
426
427 /**
428 * This function is the listener registered for each worker.
429 *
430 * @returns The listener function to execute when a message is sent from a worker.
431 */
432 protected workerListener (): (message: MessageValue<Response>) => void {
433 return message => {
434 if (message.id) {
435 const value = this.promiseMap.get(message.id)
436 if (value) {
437 this.decreaseWorkersTasks(value.worker)
438 if (message.error) value.reject(message.error)
439 else value.resolve(message.data as Response)
440 this.promiseMap.delete(message.id)
441 }
442 }
443 }
444 }
445
446 private checkAndEmitBusy (): void {
447 if (this.opts.enableEvents && this.busy) {
448 this.emitter?.emit('busy')
449 }
450 }
451 }