Tests enhancement (#172)
[poolifier.git] / src / pools / abstract-pool.ts
1 import EventEmitter from 'events'
2 import type { MessageValue } from '../utility-types'
3 import type { IPool } from './pool'
4
5 /**
6 * Callback invoked if the worker raised an error.
7 */
8 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
9
10 /**
11 * Callback invoked when the worker has started successfully.
12 */
13 export type OnlineHandler<Worker> = (this: Worker) => void
14
15 /**
16 * Callback invoked when the worker exits successfully.
17 */
18 export type ExitHandler<Worker> = (this: Worker, code: number) => void
19
20 /**
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
22 */
23 export interface IWorker {
24 /**
25 * Register a listener to the error event.
26 *
27 * @param event `'error'`.
28 * @param handler The error handler.
29 */
30 on(event: 'error', handler: ErrorHandler<this>): void
31 /**
32 * Register a listener to the online event.
33 *
34 * @param event `'online'`.
35 * @param handler The online handler.
36 */
37 on(event: 'online', handler: OnlineHandler<this>): void
38 /**
39 * Register a listener to the exit event.
40 *
41 * @param event `'exit'`.
42 * @param handler The exit handler.
43 */
44 on(event: 'exit', handler: ExitHandler<this>): void
45 /**
46 * Register a listener to the exit event that will only performed once.
47 *
48 * @param event `'exit'`.
49 * @param handler The exit handler.
50 */
51 once(event: 'exit', handler: ExitHandler<this>): void
52 }
53
54 /**
55 * Options for a poolifier pool.
56 */
57 export interface PoolOptions<Worker> {
58 /**
59 * A function that will listen for error event on each worker.
60 */
61 errorHandler?: ErrorHandler<Worker>
62 /**
63 * A function that will listen for online event on each worker.
64 */
65 onlineHandler?: OnlineHandler<Worker>
66 /**
67 * A function that will listen for exit event on each worker.
68 */
69 exitHandler?: ExitHandler<Worker>
70 /**
71 * This is just to avoid non-useful warning messages.
72 *
73 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
74 *
75 * @default 1000
76 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
77 */
78 maxTasks?: number
79 }
80
81 /**
82 * Internal poolifier pool emitter.
83 */
84 class PoolEmitter extends EventEmitter {}
85
86 /**
87 * Base class containing some shared logic for all poolifier pools.
88 *
89 * @template Worker Type of worker which manages this pool.
90 * @template Data Type of data sent to the worker.
91 * @template Response Type of response of execution.
92 */
93 export abstract class AbstractPool<
94 Worker extends IWorker,
95 Data = unknown,
96 Response = unknown
97 > implements IPool<Data, Response> {
98 /**
99 * List of currently available workers.
100 */
101 public readonly workers: Worker[] = []
102
103 /**
104 * Index for the next worker.
105 */
106 public nextWorkerIndex: number = 0
107
108 /**
109 * The tasks map.
110 *
111 * - `key`: The `Worker`
112 * - `value`: Number of tasks currently in progress on the worker.
113 */
114 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
115
116 /**
117 * Emitter on which events can be listened to.
118 *
119 * Events that can currently be listened to:
120 *
121 * - `'FullPool'`
122 */
123 public readonly emitter: PoolEmitter
124
125 /**
126 * ID of the next message.
127 */
128 protected nextMessageId: number = 0
129
130 /**
131 * Constructs a new poolifier pool.
132 *
133 * @param numberOfWorkers Number of workers that this pool should manage.
134 * @param filePath Path to the worker-file.
135 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
136 */
137 public constructor (
138 public readonly numberOfWorkers: number,
139 public readonly filePath: string,
140 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
141 ) {
142 if (!this.isMain()) {
143 throw new Error('Cannot start a pool from a worker!')
144 }
145 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
146 if (!this.filePath) {
147 throw new Error('Please specify a file with a worker implementation')
148 }
149 this.setupHook()
150
151 for (let i = 1; i <= this.numberOfWorkers; i++) {
152 this.createAndSetupWorker()
153 }
154
155 this.emitter = new PoolEmitter()
156 }
157
158 /**
159 * Perform the task specified in the constructor with the data parameter.
160 *
161 * @param data The input for the specified task.
162 * @returns Promise that will be resolved when the task is successfully completed.
163 */
164 public execute (data: Data): Promise<Response> {
165 // Configure worker to handle message with the specified task
166 const worker = this.chooseWorker()
167 this.increaseWorkersTask(worker)
168 const messageId = ++this.nextMessageId
169 const res = this.internalExecute(worker, messageId)
170 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
171 return res
172 }
173
174 /**
175 * Shut down every current worker in this pool.
176 */
177 public async destroy (): Promise<void> {
178 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
179 }
180
181 /**
182 * Shut down given worker.
183 *
184 * @param worker A worker within `workers`.
185 */
186 protected abstract destroyWorker (worker: Worker): void | Promise<void>
187
188 /**
189 * Setup hook that can be overridden by a Poolifier pool implementation
190 * to run code before workers are created in the abstract constructor.
191 */
192 protected setupHook (): void {
193 // Can be overridden
194 }
195
196 /**
197 * Should return whether the worker is the main worker or not.
198 */
199 protected abstract isMain (): boolean
200
201 /**
202 * Increase the number of tasks that the given workers has done.
203 *
204 * @param worker Workers whose tasks are increased.
205 */
206 protected increaseWorkersTask (worker: Worker): void {
207 const numberOfTasksInProgress = this.tasks.get(worker)
208 if (numberOfTasksInProgress !== undefined) {
209 this.tasks.set(worker, numberOfTasksInProgress + 1)
210 } else {
211 throw Error('Worker could not be found in tasks map')
212 }
213 }
214
215 /**
216 * Decrease the number of tasks that the given workers has done.
217 *
218 * @param worker Workers whose tasks are decreased.
219 */
220 protected decreaseWorkersTasks (worker: Worker): void {
221 const numberOfTasksInProgress = this.tasks.get(worker)
222 if (numberOfTasksInProgress !== undefined) {
223 this.tasks.set(worker, numberOfTasksInProgress - 1)
224 } else {
225 throw Error('Worker could not be found in tasks map')
226 }
227 }
228
229 /**
230 * Removes the given worker from the pool.
231 *
232 * @param worker Worker that will be removed.
233 */
234 protected removeWorker (worker: Worker): void {
235 // Clean worker from data structure
236 const workerIndex = this.workers.indexOf(worker)
237 this.workers.splice(workerIndex, 1)
238 this.tasks.delete(worker)
239 }
240
241 /**
242 * Choose a worker for the next task.
243 *
244 * The default implementation uses a round robin algorithm to distribute the load.
245 *
246 * @returns Worker.
247 */
248 protected chooseWorker (): Worker {
249 const chosenWorker = this.workers[this.nextWorkerIndex]
250 this.nextWorkerIndex =
251 this.workers.length - 1 === this.nextWorkerIndex
252 ? 0
253 : this.nextWorkerIndex + 1
254 return chosenWorker
255 }
256
257 /**
258 * Send a message to the given worker.
259 *
260 * @param worker The worker which should receive the message.
261 * @param message The message.
262 */
263 protected abstract sendToWorker (
264 worker: Worker,
265 message: MessageValue<Data>
266 ): void
267
268 protected abstract registerWorkerMessageListener<
269 Message extends Data | Response
270 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
271
272 protected abstract unregisterWorkerMessageListener<
273 Message extends Data | Response
274 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
275
276 protected internalExecute (
277 worker: Worker,
278 messageId: number
279 ): Promise<Response> {
280 return new Promise((resolve, reject) => {
281 const listener: (message: MessageValue<Response>) => void = message => {
282 if (message.id === messageId) {
283 this.unregisterWorkerMessageListener(worker, listener)
284 this.decreaseWorkersTasks(worker)
285 if (message.error) reject(message.error)
286 else resolve(message.data as Response)
287 }
288 }
289 this.registerWorkerMessageListener(worker, listener)
290 })
291 }
292
293 /**
294 * Returns a newly created worker.
295 */
296 protected abstract createWorker (): Worker
297
298 /**
299 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
300 *
301 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
302 *
303 * @param worker The newly created worker.
304 */
305 protected abstract afterWorkerSetup (worker: Worker): void
306
307 /**
308 * Creates a new worker for this pool and sets it up completely.
309 *
310 * @returns New, completely set up worker.
311 */
312 protected createAndSetupWorker (): Worker {
313 const worker: Worker = this.createWorker()
314
315 worker.on('error', this.opts.errorHandler ?? (() => {}))
316 worker.on('online', this.opts.onlineHandler ?? (() => {}))
317 worker.on('exit', this.opts.exitHandler ?? (() => {}))
318 worker.once('exit', () => this.removeWorker(worker))
319
320 this.workers.push(worker)
321
322 // Init tasks map
323 this.tasks.set(worker, 0)
324
325 this.afterWorkerSetup(worker)
326
327 return worker
328 }
329 }