README.md: Small refinements
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb
S
1import EventEmitter from 'events'
2import type { MessageValue } from '../utility-types'
3import type { IPool } from './pool'
4
c510fea7
APA
5/**
6 * An intentional empty function.
7 */
8function emptyFunction () {
9 // intentionally left blank
10}
11
729c563d
S
12/**
13 * Callback invoked if the worker raised an error.
14 */
c97c7edb 15export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
16
17/**
18 * Callback invoked when the worker has started successfully.
19 */
c97c7edb 20export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
21
22/**
23 * Callback invoked when the worker exits successfully.
24 */
c97c7edb
S
25export type ExitHandler<Worker> = (this: Worker, code: number) => void
26
729c563d
S
27/**
28 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
29 */
c97c7edb 30export interface IWorker {
3832ad95
S
31 /**
32 * Register a listener to the error event.
33 *
34 * @param event `'error'`.
35 * @param handler The error handler.
36 */
c97c7edb 37 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
38 /**
39 * Register a listener to the online event.
40 *
41 * @param event `'online'`.
42 * @param handler The online handler.
43 */
c97c7edb 44 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
45 /**
46 * Register a listener to the exit event.
47 *
48 * @param event `'exit'`.
49 * @param handler The exit handler.
50 */
c97c7edb 51 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
52 /**
53 * Register a listener to the exit event that will only performed once.
54 *
55 * @param event `'exit'`.
56 * @param handler The exit handler.
57 */
45dbbb14 58 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
59}
60
729c563d
S
61/**
62 * Options for a poolifier pool.
63 */
c97c7edb
S
64export interface PoolOptions<Worker> {
65 /**
66 * A function that will listen for error event on each worker.
67 */
68 errorHandler?: ErrorHandler<Worker>
69 /**
70 * A function that will listen for online event on each worker.
71 */
72 onlineHandler?: OnlineHandler<Worker>
73 /**
74 * A function that will listen for exit event on each worker.
75 */
76 exitHandler?: ExitHandler<Worker>
77 /**
729c563d
S
78 * This is just to avoid non-useful warning messages.
79 *
80 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
c97c7edb
S
81 *
82 * @default 1000
729c563d 83 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
c97c7edb
S
84 */
85 maxTasks?: number
86}
87
729c563d
S
88/**
89 * Internal poolifier pool emitter.
90 */
c97c7edb
S
91class PoolEmitter extends EventEmitter {}
92
729c563d
S
93/**
94 * Base class containing some shared logic for all poolifier pools.
95 *
96 * @template Worker Type of worker which manages this pool.
97 * @template Data Type of data sent to the worker.
98 * @template Response Type of response of execution.
99 */
c97c7edb
S
100export abstract class AbstractPool<
101 Worker extends IWorker,
d3c8a1a8
S
102 Data = unknown,
103 Response = unknown
c97c7edb 104> implements IPool<Data, Response> {
729c563d
S
105 /**
106 * List of currently available workers.
107 */
c97c7edb 108 public readonly workers: Worker[] = []
729c563d
S
109
110 /**
280c2a77 111 * Index for the next worker.
729c563d 112 */
280c2a77 113 public nextWorkerIndex: number = 0
c97c7edb
S
114
115 /**
3832ad95
S
116 * The tasks map.
117 *
729c563d 118 * - `key`: The `Worker`
c01733f1 119 * - `value`: Number of tasks currently in progress on the worker.
c97c7edb
S
120 */
121 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
122
729c563d
S
123 /**
124 * Emitter on which events can be listened to.
125 *
126 * Events that can currently be listened to:
127 *
128 * - `'FullPool'`
129 */
c97c7edb
S
130 public readonly emitter: PoolEmitter
131
729c563d
S
132 /**
133 * ID of the next message.
134 */
280c2a77 135 protected nextMessageId: number = 0
c97c7edb 136
729c563d
S
137 /**
138 * Constructs a new poolifier pool.
139 *
5c5a1fb7 140 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d
S
141 * @param filePath Path to the worker-file.
142 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
143 */
c97c7edb 144 public constructor (
5c5a1fb7 145 public readonly numberOfWorkers: number,
c97c7edb
S
146 public readonly filePath: string,
147 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
148 ) {
149 if (!this.isMain()) {
150 throw new Error('Cannot start a pool from a worker!')
151 }
c510fea7 152 this.checkFilePath(this.filePath)
c97c7edb
S
153 this.setupHook()
154
5c5a1fb7 155 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 156 this.createAndSetupWorker()
c97c7edb
S
157 }
158
159 this.emitter = new PoolEmitter()
160 }
161
c510fea7
APA
162 private checkFilePath (filePath: string) {
163 if (!filePath) {
164 throw new Error('Please specify a file with a worker implementation')
165 }
166 }
167
3832ad95
S
168 /**
169 * Perform the task specified in the constructor with the data parameter.
170 *
171 * @param data The input for the specified task.
172 * @returns Promise that will be resolved when the task is successfully completed.
173 */
280c2a77
S
174 public execute (data: Data): Promise<Response> {
175 // Configure worker to handle message with the specified task
176 const worker = this.chooseWorker()
177 this.increaseWorkersTask(worker)
178 const messageId = ++this.nextMessageId
179 const res = this.internalExecute(worker, messageId)
180 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
181 return res
182 }
c97c7edb 183
3832ad95
S
184 /**
185 * Shut down every current worker in this pool.
186 */
c97c7edb 187 public async destroy (): Promise<void> {
45dbbb14 188 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
189 }
190
729c563d
S
191 /**
192 * Shut down given worker.
193 *
194 * @param worker A worker within `workers`.
195 */
c97c7edb
S
196 protected abstract destroyWorker (worker: Worker): void | Promise<void>
197
729c563d 198 /**
280c2a77
S
199 * Setup hook that can be overridden by a Poolifier pool implementation
200 * to run code before workers are created in the abstract constructor.
729c563d 201 */
280c2a77
S
202 protected setupHook (): void {
203 // Can be overridden
204 }
c97c7edb 205
729c563d 206 /**
280c2a77
S
207 * Should return whether the worker is the main worker or not.
208 */
209 protected abstract isMain (): boolean
210
211 /**
212 * Increase the number of tasks that the given workers has done.
729c563d 213 *
f416c098 214 * @param worker Worker whose tasks are increased.
729c563d 215 */
280c2a77 216 protected increaseWorkersTask (worker: Worker): void {
f416c098 217 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
218 }
219
c01733f1 220 /**
d63d3be3 221 * Decrease the number of tasks that the given workers has done.
c01733f1 222 *
f416c098 223 * @param worker Worker whose tasks are decreased.
c01733f1 224 */
225 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
226 this.stepWorkerNumberOfTasks(worker, -1)
227 }
228
229 /**
230 * Step the number of tasks that the given workers has done.
231 *
232 * @param worker Worker whose tasks are set.
233 * @param step Worker number of tasks step.
234 */
235 private stepWorkerNumberOfTasks (worker: Worker, step: number) {
d63d3be3 236 const numberOfTasksInProgress = this.tasks.get(worker)
237 if (numberOfTasksInProgress !== undefined) {
f416c098 238 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 239 } else {
240 throw Error('Worker could not be found in tasks map')
241 }
242 }
243
729c563d
S
244 /**
245 * Removes the given worker from the pool.
246 *
247 * @param worker Worker that will be removed.
248 */
f2fdaa86
JB
249 protected removeWorker (worker: Worker): void {
250 // Clean worker from data structure
251 const workerIndex = this.workers.indexOf(worker)
252 this.workers.splice(workerIndex, 1)
253 this.tasks.delete(worker)
254 }
255
280c2a77
S
256 /**
257 * Choose a worker for the next task.
258 *
259 * The default implementation uses a round robin algorithm to distribute the load.
260 *
261 * @returns Worker.
262 */
263 protected chooseWorker (): Worker {
264 const chosenWorker = this.workers[this.nextWorkerIndex]
aacd8188
S
265 this.nextWorkerIndex =
266 this.workers.length - 1 === this.nextWorkerIndex
267 ? 0
268 : this.nextWorkerIndex + 1
280c2a77 269 return chosenWorker
c97c7edb
S
270 }
271
280c2a77
S
272 /**
273 * Send a message to the given worker.
274 *
275 * @param worker The worker which should receive the message.
276 * @param message The message.
277 */
278 protected abstract sendToWorker (
279 worker: Worker,
280 message: MessageValue<Data>
281 ): void
282
4f7fa42a
S
283 protected abstract registerWorkerMessageListener<
284 Message extends Data | Response
285 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 286
4f7fa42a
S
287 protected abstract unregisterWorkerMessageListener<
288 Message extends Data | Response
289 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 290
280c2a77
S
291 protected internalExecute (
292 worker: Worker,
293 messageId: number
294 ): Promise<Response> {
c97c7edb
S
295 return new Promise((resolve, reject) => {
296 const listener: (message: MessageValue<Response>) => void = message => {
280c2a77 297 if (message.id === messageId) {
c97c7edb 298 this.unregisterWorkerMessageListener(worker, listener)
c01733f1 299 this.decreaseWorkersTasks(worker)
c97c7edb
S
300 if (message.error) reject(message.error)
301 else resolve(message.data as Response)
302 }
303 }
304 this.registerWorkerMessageListener(worker, listener)
305 })
306 }
307
729c563d
S
308 /**
309 * Returns a newly created worker.
310 */
280c2a77 311 protected abstract createWorker (): Worker
c97c7edb 312
729c563d
S
313 /**
314 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
315 *
316 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
317 *
318 * @param worker The newly created worker.
319 */
280c2a77 320 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 321
729c563d
S
322 /**
323 * Creates a new worker for this pool and sets it up completely.
50eceb07
S
324 *
325 * @returns New, completely set up worker.
729c563d 326 */
280c2a77
S
327 protected createAndSetupWorker (): Worker {
328 const worker: Worker = this.createWorker()
329
c510fea7
APA
330 worker.on('error', this.opts.errorHandler ?? emptyFunction)
331 worker.on('online', this.opts.onlineHandler ?? emptyFunction)
332 worker.on('exit', this.opts.exitHandler ?? emptyFunction)
45dbbb14 333 worker.once('exit', () => this.removeWorker(worker))
280c2a77 334
c97c7edb 335 this.workers.push(worker)
280c2a77
S
336
337 // Init tasks map
c97c7edb 338 this.tasks.set(worker, 0)
280c2a77
S
339
340 this.afterWorkerSetup(worker)
341
c97c7edb
S
342 return worker
343 }
344}