Improve jsdoc comments (#175)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb
S
1import EventEmitter from 'events'
2import type { MessageValue } from '../utility-types'
3import type { IPool } from './pool'
4
729c563d
S
5/**
6 * Callback invoked if the worker raised an error.
7 */
c97c7edb 8export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
9
10/**
11 * Callback invoked when the worker has started successfully.
12 */
c97c7edb 13export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
14
15/**
16 * Callback invoked when the worker exits successfully.
17 */
c97c7edb
S
18export type ExitHandler<Worker> = (this: Worker, code: number) => void
19
729c563d
S
20/**
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
22 */
c97c7edb 23export interface IWorker {
3832ad95
S
24 /**
25 * Register a listener to the error event.
26 *
27 * @param event `'error'`.
28 * @param handler The error handler.
29 */
c97c7edb 30 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
31 /**
32 * Register a listener to the online event.
33 *
34 * @param event `'online'`.
35 * @param handler The online handler.
36 */
c97c7edb 37 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
38 /**
39 * Register a listener to the exit event.
40 *
41 * @param event `'exit'`.
42 * @param handler The exit handler.
43 */
c97c7edb 44 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
45 /**
46 * Register a listener to the exit event that will only performed once.
47 *
48 * @param event `'exit'`.
49 * @param handler The exit handler.
50 */
45dbbb14 51 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
52}
53
729c563d
S
54/**
55 * Options for a poolifier pool.
56 */
c97c7edb
S
57export interface PoolOptions<Worker> {
58 /**
59 * A function that will listen for error event on each worker.
60 */
61 errorHandler?: ErrorHandler<Worker>
62 /**
63 * A function that will listen for online event on each worker.
64 */
65 onlineHandler?: OnlineHandler<Worker>
66 /**
67 * A function that will listen for exit event on each worker.
68 */
69 exitHandler?: ExitHandler<Worker>
70 /**
729c563d
S
71 * This is just to avoid non-useful warning messages.
72 *
73 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
c97c7edb
S
74 *
75 * @default 1000
729c563d 76 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
c97c7edb
S
77 */
78 maxTasks?: number
79}
80
729c563d
S
81/**
82 * Internal poolifier pool emitter.
83 */
c97c7edb
S
84class PoolEmitter extends EventEmitter {}
85
729c563d
S
86/**
87 * Base class containing some shared logic for all poolifier pools.
88 *
89 * @template Worker Type of worker which manages this pool.
90 * @template Data Type of data sent to the worker.
91 * @template Response Type of response of execution.
92 */
c97c7edb
S
93export abstract class AbstractPool<
94 Worker extends IWorker,
d3c8a1a8
S
95 Data = unknown,
96 Response = unknown
c97c7edb 97> implements IPool<Data, Response> {
729c563d
S
98 /**
99 * List of currently available workers.
100 */
c97c7edb 101 public readonly workers: Worker[] = []
729c563d
S
102
103 /**
280c2a77 104 * Index for the next worker.
729c563d 105 */
280c2a77 106 public nextWorkerIndex: number = 0
c97c7edb
S
107
108 /**
3832ad95
S
109 * The tasks map.
110 *
729c563d 111 * - `key`: The `Worker`
c01733f1 112 * - `value`: Number of tasks currently in progress on the worker.
c97c7edb
S
113 */
114 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
115
729c563d
S
116 /**
117 * Emitter on which events can be listened to.
118 *
119 * Events that can currently be listened to:
120 *
121 * - `'FullPool'`
122 */
c97c7edb
S
123 public readonly emitter: PoolEmitter
124
729c563d
S
125 /**
126 * ID of the next message.
127 */
280c2a77 128 protected nextMessageId: number = 0
c97c7edb 129
729c563d
S
130 /**
131 * Constructs a new poolifier pool.
132 *
5c5a1fb7 133 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d
S
134 * @param filePath Path to the worker-file.
135 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
136 */
c97c7edb 137 public constructor (
5c5a1fb7 138 public readonly numberOfWorkers: number,
c97c7edb
S
139 public readonly filePath: string,
140 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
141 ) {
142 if (!this.isMain()) {
143 throw new Error('Cannot start a pool from a worker!')
144 }
145 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
146 if (!this.filePath) {
147 throw new Error('Please specify a file with a worker implementation')
148 }
c97c7edb
S
149 this.setupHook()
150
5c5a1fb7 151 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 152 this.createAndSetupWorker()
c97c7edb
S
153 }
154
155 this.emitter = new PoolEmitter()
156 }
157
729c563d 158 /**
5c5a1fb7
S
159 * Number of workers that this pool should manage.
160 *
161 * @returns Number of workers that this pool manages.
162 * @deprecated Only here for backward compatibility.
163 */
164 // eslint-disable-next-line spellcheck/spell-checker
165 public get numWorkers (): number {
166 return this.numberOfWorkers
167 }
168
169 /**
280c2a77
S
170 * Index for the next worker.
171 *
172 * @returns Index for the next worker.
173 * @deprecated Only here for backward compatibility.
729c563d 174 */
280c2a77
S
175 public get nextWorker (): number {
176 return this.nextWorkerIndex
c97c7edb
S
177 }
178
3832ad95
S
179 /**
180 * Perform the task specified in the constructor with the data parameter.
181 *
182 * @param data The input for the specified task.
183 * @returns Promise that will be resolved when the task is successfully completed.
184 */
280c2a77
S
185 public execute (data: Data): Promise<Response> {
186 // Configure worker to handle message with the specified task
187 const worker = this.chooseWorker()
188 this.increaseWorkersTask(worker)
189 const messageId = ++this.nextMessageId
190 const res = this.internalExecute(worker, messageId)
191 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
192 return res
193 }
c97c7edb 194
3832ad95
S
195 /**
196 * Shut down every current worker in this pool.
197 */
c97c7edb 198 public async destroy (): Promise<void> {
45dbbb14 199 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
200 }
201
729c563d
S
202 /**
203 * Shut down given worker.
204 *
205 * @param worker A worker within `workers`.
206 */
c97c7edb
S
207 protected abstract destroyWorker (worker: Worker): void | Promise<void>
208
729c563d 209 /**
280c2a77
S
210 * Setup hook that can be overridden by a Poolifier pool implementation
211 * to run code before workers are created in the abstract constructor.
729c563d 212 */
280c2a77
S
213 protected setupHook (): void {
214 // Can be overridden
215 }
c97c7edb 216
729c563d 217 /**
280c2a77
S
218 * Should return whether the worker is the main worker or not.
219 */
220 protected abstract isMain (): boolean
221
222 /**
223 * Increase the number of tasks that the given workers has done.
729c563d 224 *
280c2a77 225 * @param worker Workers whose tasks are increased.
729c563d 226 */
280c2a77 227 protected increaseWorkersTask (worker: Worker): void {
d63d3be3 228 const numberOfTasksInProgress = this.tasks.get(worker)
229 if (numberOfTasksInProgress !== undefined) {
230 this.tasks.set(worker, numberOfTasksInProgress + 1)
c97c7edb
S
231 } else {
232 throw Error('Worker could not be found in tasks map')
233 }
234 }
235
c01733f1 236 /**
d63d3be3 237 * Decrease the number of tasks that the given workers has done.
c01733f1 238 *
d63d3be3 239 * @param worker Workers whose tasks are decreased.
c01733f1 240 */
241 protected decreaseWorkersTasks (worker: Worker): void {
d63d3be3 242 const numberOfTasksInProgress = this.tasks.get(worker)
243 if (numberOfTasksInProgress !== undefined) {
244 this.tasks.set(worker, numberOfTasksInProgress - 1)
c01733f1 245 } else {
246 throw Error('Worker could not be found in tasks map')
247 }
248 }
249
729c563d
S
250 /**
251 * Removes the given worker from the pool.
252 *
253 * @param worker Worker that will be removed.
254 */
f2fdaa86
JB
255 protected removeWorker (worker: Worker): void {
256 // Clean worker from data structure
257 const workerIndex = this.workers.indexOf(worker)
258 this.workers.splice(workerIndex, 1)
259 this.tasks.delete(worker)
260 }
261
280c2a77
S
262 /**
263 * Choose a worker for the next task.
264 *
265 * The default implementation uses a round robin algorithm to distribute the load.
266 *
267 * @returns Worker.
268 */
269 protected chooseWorker (): Worker {
270 const chosenWorker = this.workers[this.nextWorkerIndex]
aacd8188
S
271 this.nextWorkerIndex =
272 this.workers.length - 1 === this.nextWorkerIndex
273 ? 0
274 : this.nextWorkerIndex + 1
280c2a77 275 return chosenWorker
c97c7edb
S
276 }
277
280c2a77
S
278 /**
279 * Send a message to the given worker.
280 *
281 * @param worker The worker which should receive the message.
282 * @param message The message.
283 */
284 protected abstract sendToWorker (
285 worker: Worker,
286 message: MessageValue<Data>
287 ): void
288
4f7fa42a
S
289 protected abstract registerWorkerMessageListener<
290 Message extends Data | Response
291 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 292
4f7fa42a
S
293 protected abstract unregisterWorkerMessageListener<
294 Message extends Data | Response
295 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 296
280c2a77
S
297 protected internalExecute (
298 worker: Worker,
299 messageId: number
300 ): Promise<Response> {
c97c7edb
S
301 return new Promise((resolve, reject) => {
302 const listener: (message: MessageValue<Response>) => void = message => {
280c2a77 303 if (message.id === messageId) {
c97c7edb 304 this.unregisterWorkerMessageListener(worker, listener)
c01733f1 305 this.decreaseWorkersTasks(worker)
c97c7edb
S
306 if (message.error) reject(message.error)
307 else resolve(message.data as Response)
308 }
309 }
310 this.registerWorkerMessageListener(worker, listener)
311 })
312 }
313
729c563d
S
314 /**
315 * Returns a newly created worker.
316 */
280c2a77 317 protected abstract createWorker (): Worker
c97c7edb 318
729c563d
S
319 /**
320 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
321 *
322 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
323 *
324 * @param worker The newly created worker.
325 */
280c2a77 326 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 327
729c563d
S
328 /**
329 * Creates a new worker for this pool and sets it up completely.
50eceb07
S
330 *
331 * @returns New, completely set up worker.
729c563d 332 */
280c2a77
S
333 protected createAndSetupWorker (): Worker {
334 const worker: Worker = this.createWorker()
335
c97c7edb
S
336 worker.on('error', this.opts.errorHandler ?? (() => {}))
337 worker.on('online', this.opts.onlineHandler ?? (() => {}))
c97c7edb 338 worker.on('exit', this.opts.exitHandler ?? (() => {}))
45dbbb14 339 worker.once('exit', () => this.removeWorker(worker))
280c2a77 340
c97c7edb 341 this.workers.push(worker)
280c2a77
S
342
343 // Init tasks map
c97c7edb 344 this.tasks.set(worker, 0)
280c2a77
S
345
346 this.afterWorkerSetup(worker)
347
c97c7edb
S
348 return worker
349 }
350}