Check that a pool have a minimum number of workers (#213)
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import type { IPoolInternal } from './pool-internal'
6 import { PoolEmitter } from './pool-internal'
7 import type { WorkerChoiceStrategy } from './selection-strategies'
8 import {
9 WorkerChoiceStrategies,
10 WorkerChoiceStrategyContext
11 } from './selection-strategies'
12
13 /**
14 * An intentional empty function.
15 */
16 const EMPTY_FUNCTION: () => void = () => {
17 /* Intentionally empty */
18 }
19
20 /**
21 * Callback invoked if the worker raised an error.
22 */
23 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
24
25 /**
26 * Callback invoked when the worker has started successfully.
27 */
28 export type OnlineHandler<Worker> = (this: Worker) => void
29
30 /**
31 * Callback invoked when the worker exits successfully.
32 */
33 export type ExitHandler<Worker> = (this: Worker, code: number) => void
34
35 /**
36 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
37 */
38 export interface IWorker {
39 /**
40 * Register a listener to the error event.
41 *
42 * @param event `'error'`.
43 * @param handler The error handler.
44 */
45 on(event: 'error', handler: ErrorHandler<this>): void
46 /**
47 * Register a listener to the online event.
48 *
49 * @param event `'online'`.
50 * @param handler The online handler.
51 */
52 on(event: 'online', handler: OnlineHandler<this>): void
53 /**
54 * Register a listener to the exit event.
55 *
56 * @param event `'exit'`.
57 * @param handler The exit handler.
58 */
59 on(event: 'exit', handler: ExitHandler<this>): void
60 /**
61 * Register a listener to the exit event that will only performed once.
62 *
63 * @param event `'exit'`.
64 * @param handler The exit handler.
65 */
66 once(event: 'exit', handler: ExitHandler<this>): void
67 }
68
69 /**
70 * Options for a poolifier pool.
71 */
72 export interface PoolOptions<Worker> {
73 /**
74 * A function that will listen for error event on each worker.
75 */
76 errorHandler?: ErrorHandler<Worker>
77 /**
78 * A function that will listen for online event on each worker.
79 */
80 onlineHandler?: OnlineHandler<Worker>
81 /**
82 * A function that will listen for exit event on each worker.
83 */
84 exitHandler?: ExitHandler<Worker>
85 /**
86 * This is just to avoid non-useful warning messages.
87 *
88 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
89 *
90 * @default 1000
91 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
92 */
93 maxTasks?: number
94 /**
95 * The work choice strategy to use in this pool.
96 */
97 workerChoiceStrategy?: WorkerChoiceStrategy
98 }
99
100 /**
101 * Base class containing some shared logic for all poolifier pools.
102 *
103 * @template Worker Type of worker which manages this pool.
104 * @template Data Type of data sent to the worker. This can only be serializable data.
105 * @template Response Type of response of execution. This can only be serializable data.
106 */
107 export abstract class AbstractPool<
108 Worker extends IWorker,
109 Data = unknown,
110 Response = unknown
111 > implements IPoolInternal<Worker, Data, Response> {
112 /**
113 * The promise map.
114 *
115 * - `key`: This is the message ID of each submitted task.
116 * - `value`: An object that contains the worker, the resolve function and the reject function.
117 *
118 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
119 */
120 protected promiseMap: Map<
121 number,
122 PromiseWorkerResponseWrapper<Worker, Response>
123 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
124
125 /** @inheritdoc */
126 public readonly workers: Worker[] = []
127
128 /** @inheritdoc */
129 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
130
131 /** @inheritdoc */
132 public readonly emitter: PoolEmitter
133
134 /**
135 * ID of the next message.
136 */
137 protected nextMessageId: number = 0
138
139 /**
140 * Worker choice strategy instance implementing the worker choice algorithm.
141 *
142 * Default to a strategy implementing a round robin algorithm.
143 */
144 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
145 Worker,
146 Data,
147 Response
148 >
149
150 /**
151 * Constructs a new poolifier pool.
152 *
153 * @param numberOfWorkers Number of workers that this pool should manage.
154 * @param filePath Path to the worker-file.
155 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
156 */
157 public constructor (
158 public readonly numberOfWorkers: number,
159 public readonly filePath: string,
160 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
161 ) {
162 if (!this.isMain()) {
163 throw new Error('Cannot start a pool from a worker!')
164 }
165 this.checkNumberOfWorkers(this.numberOfWorkers)
166 this.checkFilePath(this.filePath)
167 this.setupHook()
168
169 for (let i = 1; i <= this.numberOfWorkers; i++) {
170 this.createAndSetupWorker()
171 }
172
173 this.emitter = new PoolEmitter()
174 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
175 this,
176 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
177 )
178 }
179
180 private checkFilePath (filePath: string): void {
181 if (!filePath) {
182 throw new Error('Please specify a file with a worker implementation')
183 }
184 }
185
186 private checkNumberOfWorkers (numberOfWorkers: number): void {
187 if (numberOfWorkers == null) {
188 throw new Error(
189 'Cannot instantiate a pool without specifying the number of workers'
190 )
191 } else if (!Number.isSafeInteger(numberOfWorkers)) {
192 throw new Error(
193 'Cannot instantiate a pool with a non integer number of workers'
194 )
195 } else if (numberOfWorkers < 0) {
196 throw new Error(
197 'Cannot instantiate a pool with a negative number of workers'
198 )
199 } else if (!this.isDynamic() && numberOfWorkers === 0) {
200 throw new Error('Cannot instantiate a fixed pool with no worker')
201 }
202 }
203
204 /** @inheritdoc */
205 public isDynamic (): boolean {
206 return false
207 }
208
209 /** @inheritdoc */
210 public setWorkerChoiceStrategy (
211 workerChoiceStrategy: WorkerChoiceStrategy
212 ): void {
213 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
214 workerChoiceStrategy
215 )
216 }
217
218 /** @inheritdoc */
219 public execute (data: Data): Promise<Response> {
220 // Configure worker to handle message with the specified task
221 const worker = this.chooseWorker()
222 this.increaseWorkersTask(worker)
223 const messageId = ++this.nextMessageId
224 const res = this.internalExecute(worker, messageId)
225 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
226 return res
227 }
228
229 /** @inheritdoc */
230 public async destroy (): Promise<void> {
231 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
232 }
233
234 /** @inheritdoc */
235 public abstract destroyWorker (worker: Worker): void | Promise<void>
236
237 /**
238 * Setup hook that can be overridden by a Poolifier pool implementation
239 * to run code before workers are created in the abstract constructor.
240 */
241 protected setupHook (): void {
242 // Can be overridden
243 }
244
245 /**
246 * Should return whether the worker is the main worker or not.
247 */
248 protected abstract isMain (): boolean
249
250 /**
251 * Increase the number of tasks that the given workers has done.
252 *
253 * @param worker Worker whose tasks are increased.
254 */
255 protected increaseWorkersTask (worker: Worker): void {
256 this.stepWorkerNumberOfTasks(worker, 1)
257 }
258
259 /**
260 * Decrease the number of tasks that the given workers has done.
261 *
262 * @param worker Worker whose tasks are decreased.
263 */
264 protected decreaseWorkersTasks (worker: Worker): void {
265 this.stepWorkerNumberOfTasks(worker, -1)
266 }
267
268 /**
269 * Step the number of tasks that the given workers has done.
270 *
271 * @param worker Worker whose tasks are set.
272 * @param step Worker number of tasks step.
273 */
274 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
275 const numberOfTasksInProgress = this.tasks.get(worker)
276 if (numberOfTasksInProgress !== undefined) {
277 this.tasks.set(worker, numberOfTasksInProgress + step)
278 } else {
279 throw Error('Worker could not be found in tasks map')
280 }
281 }
282
283 /**
284 * Removes the given worker from the pool.
285 *
286 * @param worker Worker that will be removed.
287 */
288 protected removeWorker (worker: Worker): void {
289 // Clean worker from data structure
290 const workerIndex = this.workers.indexOf(worker)
291 this.workers.splice(workerIndex, 1)
292 this.tasks.delete(worker)
293 }
294
295 /**
296 * Choose a worker for the next task.
297 *
298 * The default implementation uses a round robin algorithm to distribute the load.
299 *
300 * @returns Worker.
301 */
302 protected chooseWorker (): Worker {
303 return this.workerChoiceStrategyContext.execute()
304 }
305
306 /**
307 * Send a message to the given worker.
308 *
309 * @param worker The worker which should receive the message.
310 * @param message The message.
311 */
312 protected abstract sendToWorker (
313 worker: Worker,
314 message: MessageValue<Data>
315 ): void
316
317 /** @inheritdoc */
318 public abstract registerWorkerMessageListener<
319 Message extends Data | Response
320 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
321
322 protected internalExecute (
323 worker: Worker,
324 messageId: number
325 ): Promise<Response> {
326 return new Promise<Response>((resolve, reject) => {
327 this.promiseMap.set(messageId, { resolve, reject, worker })
328 })
329 }
330
331 /**
332 * Returns a newly created worker.
333 */
334 protected abstract createWorker (): Worker
335
336 /**
337 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
338 *
339 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
340 *
341 * @param worker The newly created worker.
342 */
343 protected abstract afterWorkerSetup (worker: Worker): void
344
345 /** @inheritdoc */
346 public createAndSetupWorker (): Worker {
347 const worker: Worker = this.createWorker()
348
349 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
350 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
351 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
352 worker.once('exit', () => this.removeWorker(worker))
353
354 this.workers.push(worker)
355
356 // Init tasks map
357 this.tasks.set(worker, 0)
358
359 this.afterWorkerSetup(worker)
360
361 return worker
362 }
363
364 /**
365 * This function is the listener registered for each worker.
366 *
367 * @returns The listener function to execute when a message is sent from a worker.
368 */
369 protected workerListener (): (message: MessageValue<Response>) => void {
370 const listener: (message: MessageValue<Response>) => void = message => {
371 if (message.id) {
372 const value = this.promiseMap.get(message.id)
373 if (value) {
374 this.decreaseWorkersTasks(value.worker)
375 if (message.error) value.reject(message.error)
376 else value.resolve(message.data as Response)
377 this.promiseMap.delete(message.id)
378 }
379 }
380 }
381 return listener
382 }
383 }