0435a32c47067442af13cde6773a7831754e516f
[poolifier.git] / src / pools / abstract-pool.ts
1 import EventEmitter from 'events'
2 import type { MessageValue } from '../utility-types'
3 import type { IPool } from './pool'
4
5 /**
6 * An intentional empty function.
7 */
8 function emptyFunction () {
9 // intentionally left blank
10 }
11
12 /**
13 * Callback invoked if the worker raised an error.
14 */
15 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
16
17 /**
18 * Callback invoked when the worker has started successfully.
19 */
20 export type OnlineHandler<Worker> = (this: Worker) => void
21
22 /**
23 * Callback invoked when the worker exits successfully.
24 */
25 export type ExitHandler<Worker> = (this: Worker, code: number) => void
26
27 /**
28 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
29 */
30 export interface IWorker {
31 /**
32 * Register a listener to the error event.
33 *
34 * @param event `'error'`.
35 * @param handler The error handler.
36 */
37 on(event: 'error', handler: ErrorHandler<this>): void
38 /**
39 * Register a listener to the online event.
40 *
41 * @param event `'online'`.
42 * @param handler The online handler.
43 */
44 on(event: 'online', handler: OnlineHandler<this>): void
45 /**
46 * Register a listener to the exit event.
47 *
48 * @param event `'exit'`.
49 * @param handler The exit handler.
50 */
51 on(event: 'exit', handler: ExitHandler<this>): void
52 /**
53 * Register a listener to the exit event that will only performed once.
54 *
55 * @param event `'exit'`.
56 * @param handler The exit handler.
57 */
58 once(event: 'exit', handler: ExitHandler<this>): void
59 }
60
61 /**
62 * Options for a poolifier pool.
63 */
64 export interface PoolOptions<Worker> {
65 /**
66 * A function that will listen for error event on each worker.
67 */
68 errorHandler?: ErrorHandler<Worker>
69 /**
70 * A function that will listen for online event on each worker.
71 */
72 onlineHandler?: OnlineHandler<Worker>
73 /**
74 * A function that will listen for exit event on each worker.
75 */
76 exitHandler?: ExitHandler<Worker>
77 /**
78 * This is just to avoid non-useful warning messages.
79 *
80 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
81 *
82 * @default 1000
83 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
84 */
85 maxTasks?: number
86 }
87
88 /**
89 * Internal poolifier pool emitter.
90 */
91 class PoolEmitter extends EventEmitter {}
92
93 /**
94 * Base class containing some shared logic for all poolifier pools.
95 *
96 * @template Worker Type of worker which manages this pool.
97 * @template Data Type of data sent to the worker. This can only be serializable data.
98 * @template Response Type of response of execution. This can only be serializable data.
99 */
100 export abstract class AbstractPool<
101 Worker extends IWorker,
102 Data = unknown,
103 Response = unknown
104 > implements IPool<Data, Response> {
105 /**
106 * List of currently available workers.
107 */
108 public readonly workers: Worker[] = []
109
110 /**
111 * Index for the next worker.
112 */
113 public nextWorkerIndex: number = 0
114
115 /**
116 * The tasks map.
117 *
118 * - `key`: The `Worker`
119 * - `value`: Number of tasks currently in progress on the worker.
120 */
121 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
122
123 /**
124 * Emitter on which events can be listened to.
125 *
126 * Events that can currently be listened to:
127 *
128 * - `'FullPool'`
129 */
130 public readonly emitter: PoolEmitter
131
132 /**
133 * ID of the next message.
134 */
135 protected nextMessageId: number = 0
136
137 /**
138 * Constructs a new poolifier pool.
139 *
140 * @param numberOfWorkers Number of workers that this pool should manage.
141 * @param filePath Path to the worker-file.
142 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
143 */
144 public constructor (
145 public readonly numberOfWorkers: number,
146 public readonly filePath: string,
147 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
148 ) {
149 if (!this.isMain()) {
150 throw new Error('Cannot start a pool from a worker!')
151 }
152 this.checkFilePath(this.filePath)
153 this.setupHook()
154
155 for (let i = 1; i <= this.numberOfWorkers; i++) {
156 this.createAndSetupWorker()
157 }
158
159 this.emitter = new PoolEmitter()
160 }
161
162 private checkFilePath (filePath: string) {
163 if (!filePath) {
164 throw new Error('Please specify a file with a worker implementation')
165 }
166 }
167
168 /**
169 * Perform the task specified in the constructor with the data parameter.
170 *
171 * @param data The input for the specified task. This can only be serializable data.
172 * @returns Promise that will be resolved when the task is successfully completed.
173 */
174 public execute (data: Data): Promise<Response> {
175 // Configure worker to handle message with the specified task
176 const worker = this.chooseWorker()
177 this.increaseWorkersTask(worker)
178 const messageId = ++this.nextMessageId
179 const res = this.internalExecute(worker, messageId)
180 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
181 return res
182 }
183
184 /**
185 * Shut down every current worker in this pool.
186 */
187 public async destroy (): Promise<void> {
188 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
189 }
190
191 /**
192 * Shut down given worker.
193 *
194 * @param worker A worker within `workers`.
195 */
196 protected abstract destroyWorker (worker: Worker): void | Promise<void>
197
198 /**
199 * Setup hook that can be overridden by a Poolifier pool implementation
200 * to run code before workers are created in the abstract constructor.
201 */
202 protected setupHook (): void {
203 // Can be overridden
204 }
205
206 /**
207 * Should return whether the worker is the main worker or not.
208 */
209 protected abstract isMain (): boolean
210
211 /**
212 * Increase the number of tasks that the given workers has done.
213 *
214 * @param worker Worker whose tasks are increased.
215 */
216 protected increaseWorkersTask (worker: Worker): void {
217 this.stepWorkerNumberOfTasks(worker, 1)
218 }
219
220 /**
221 * Decrease the number of tasks that the given workers has done.
222 *
223 * @param worker Worker whose tasks are decreased.
224 */
225 protected decreaseWorkersTasks (worker: Worker): void {
226 this.stepWorkerNumberOfTasks(worker, -1)
227 }
228
229 /**
230 * Step the number of tasks that the given workers has done.
231 *
232 * @param worker Worker whose tasks are set.
233 * @param step Worker number of tasks step.
234 */
235 private stepWorkerNumberOfTasks (worker: Worker, step: number) {
236 const numberOfTasksInProgress = this.tasks.get(worker)
237 if (numberOfTasksInProgress !== undefined) {
238 this.tasks.set(worker, numberOfTasksInProgress + step)
239 } else {
240 throw Error('Worker could not be found in tasks map')
241 }
242 }
243
244 /**
245 * Removes the given worker from the pool.
246 *
247 * @param worker Worker that will be removed.
248 */
249 protected removeWorker (worker: Worker): void {
250 // Clean worker from data structure
251 const workerIndex = this.workers.indexOf(worker)
252 this.workers.splice(workerIndex, 1)
253 this.tasks.delete(worker)
254 }
255
256 /**
257 * Choose a worker for the next task.
258 *
259 * The default implementation uses a round robin algorithm to distribute the load.
260 *
261 * @returns Worker.
262 */
263 protected chooseWorker (): Worker {
264 const chosenWorker = this.workers[this.nextWorkerIndex]
265 this.nextWorkerIndex =
266 this.workers.length - 1 === this.nextWorkerIndex
267 ? 0
268 : this.nextWorkerIndex + 1
269 return chosenWorker
270 }
271
272 /**
273 * Send a message to the given worker.
274 *
275 * @param worker The worker which should receive the message.
276 * @param message The message.
277 */
278 protected abstract sendToWorker (
279 worker: Worker,
280 message: MessageValue<Data>
281 ): void
282
283 protected abstract registerWorkerMessageListener<
284 Message extends Data | Response
285 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
286
287 protected abstract unregisterWorkerMessageListener<
288 Message extends Data | Response
289 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
290
291 protected internalExecute (
292 worker: Worker,
293 messageId: number
294 ): Promise<Response> {
295 return new Promise((resolve, reject) => {
296 const listener: (message: MessageValue<Response>) => void = message => {
297 if (message.id === messageId) {
298 this.unregisterWorkerMessageListener(worker, listener)
299 this.decreaseWorkersTasks(worker)
300 if (message.error) reject(message.error)
301 else resolve(message.data as Response)
302 }
303 }
304 this.registerWorkerMessageListener(worker, listener)
305 })
306 }
307
308 /**
309 * Returns a newly created worker.
310 */
311 protected abstract createWorker (): Worker
312
313 /**
314 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
315 *
316 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
317 *
318 * @param worker The newly created worker.
319 */
320 protected abstract afterWorkerSetup (worker: Worker): void
321
322 /**
323 * Creates a new worker for this pool and sets it up completely.
324 *
325 * @returns New, completely set up worker.
326 */
327 protected createAndSetupWorker (): Worker {
328 const worker: Worker = this.createWorker()
329
330 worker.on('error', this.opts.errorHandler ?? emptyFunction)
331 worker.on('online', this.opts.onlineHandler ?? emptyFunction)
332 worker.on('exit', this.opts.exitHandler ?? emptyFunction)
333 worker.once('exit', () => this.removeWorker(worker))
334
335 this.workers.push(worker)
336
337 // Init tasks map
338 this.tasks.set(worker, 0)
339
340 this.afterWorkerSetup(worker)
341
342 return worker
343 }
344 }