De-duplicate code for workers (#154)
[poolifier.git] / src / pools / abstract-pool.ts
1 import EventEmitter from 'events'
2 import type { MessageValue } from '../utility-types'
3 import type { IPool } from './pool'
4
5 /**
6 * Callback invoked if the worker raised an error.
7 */
8 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
9
10 /**
11 * Callback invoked when the worker has started successfully.
12 */
13 export type OnlineHandler<Worker> = (this: Worker) => void
14
15 /**
16 * Callback invoked when the worker exits successfully.
17 */
18 export type ExitHandler<Worker> = (this: Worker, code: number) => void
19
20 /**
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
22 */
23 export interface IWorker {
24 on(event: 'error', handler: ErrorHandler<this>): void
25 on(event: 'online', handler: OnlineHandler<this>): void
26 on(event: 'exit', handler: ExitHandler<this>): void
27 once(event: 'exit', handler: ExitHandler<this>): void
28 }
29
30 /**
31 * Options for a poolifier pool.
32 */
33 export interface PoolOptions<Worker> {
34 /**
35 * A function that will listen for error event on each worker.
36 */
37 errorHandler?: ErrorHandler<Worker>
38 /**
39 * A function that will listen for online event on each worker.
40 */
41 onlineHandler?: OnlineHandler<Worker>
42 /**
43 * A function that will listen for exit event on each worker.
44 */
45 exitHandler?: ExitHandler<Worker>
46 /**
47 * This is just to avoid non-useful warning messages.
48 *
49 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
50 *
51 * @default 1000
52 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
53 */
54 maxTasks?: number
55 }
56
57 /**
58 * Internal poolifier pool emitter.
59 */
60 class PoolEmitter extends EventEmitter {}
61
62 /**
63 * Base class containing some shared logic for all poolifier pools.
64 *
65 * @template Worker Type of worker which manages this pool.
66 * @template Data Type of data sent to the worker.
67 * @template Response Type of response of execution.
68 */
69 export abstract class AbstractPool<
70 Worker extends IWorker,
71 Data = unknown,
72 Response = unknown
73 > implements IPool<Data, Response> {
74 /**
75 * List of currently available workers.
76 */
77 public readonly workers: Worker[] = []
78
79 /**
80 * Index for the next worker.
81 */
82 public nextWorkerIndex: number = 0
83
84 /**
85 * - `key`: The `Worker`
86 * - `value`: Number of tasks that has been assigned to that worker since it started
87 */
88 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
89
90 /**
91 * Emitter on which events can be listened to.
92 *
93 * Events that can currently be listened to:
94 *
95 * - `'FullPool'`
96 */
97 public readonly emitter: PoolEmitter
98
99 /**
100 * ID of the next message.
101 */
102 protected nextMessageId: number = 0
103
104 /**
105 * Constructs a new poolifier pool.
106 *
107 * @param numberOfWorkers Number of workers that this pool should manage.
108 * @param filePath Path to the worker-file.
109 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
110 */
111 public constructor (
112 public readonly numberOfWorkers: number,
113 public readonly filePath: string,
114 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
115 ) {
116 if (!this.isMain()) {
117 throw new Error('Cannot start a pool from a worker!')
118 }
119 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
120 if (!this.filePath) {
121 throw new Error('Please specify a file with a worker implementation')
122 }
123
124 this.setupHook()
125
126 for (let i = 1; i <= this.numberOfWorkers; i++) {
127 this.createAndSetupWorker()
128 }
129
130 this.emitter = new PoolEmitter()
131 }
132
133 /**
134 * Number of workers that this pool should manage.
135 *
136 * @returns Number of workers that this pool manages.
137 * @deprecated Only here for backward compatibility.
138 */
139 // eslint-disable-next-line spellcheck/spell-checker
140 public get numWorkers (): number {
141 return this.numberOfWorkers
142 }
143
144 /**
145 * Index for the next worker.
146 *
147 * @returns Index for the next worker.
148 * @deprecated Only here for backward compatibility.
149 */
150 public get nextWorker (): number {
151 return this.nextWorkerIndex
152 }
153
154 public execute (data: Data): Promise<Response> {
155 // Configure worker to handle message with the specified task
156 const worker = this.chooseWorker()
157 this.increaseWorkersTask(worker)
158 const messageId = ++this.nextMessageId
159 const res = this.internalExecute(worker, messageId)
160 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
161 return res
162 }
163
164 public async destroy (): Promise<void> {
165 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
166 }
167
168 /**
169 * Shut down given worker.
170 *
171 * @param worker A worker within `workers`.
172 */
173 protected abstract destroyWorker (worker: Worker): void | Promise<void>
174
175 /**
176 * Setup hook that can be overridden by a Poolifier pool implementation
177 * to run code before workers are created in the abstract constructor.
178 */
179 protected setupHook (): void {
180 // Can be overridden
181 }
182
183 /**
184 * Should return whether the worker is the main worker or not.
185 */
186 protected abstract isMain (): boolean
187
188 /**
189 * Increase the number of tasks that the given workers has done.
190 *
191 * @param worker Workers whose tasks are increased.
192 */
193 protected increaseWorkersTask (worker: Worker): void {
194 const numberOfTasksTheWorkerHas = this.tasks.get(worker)
195 if (numberOfTasksTheWorkerHas !== undefined) {
196 this.tasks.set(worker, numberOfTasksTheWorkerHas + 1)
197 } else {
198 throw Error('Worker could not be found in tasks map')
199 }
200 }
201
202 /**
203 * Removes the given worker from the pool.
204 *
205 * @param worker Worker that will be removed.
206 */
207 protected removeWorker (worker: Worker): void {
208 // Clean worker from data structure
209 const workerIndex = this.workers.indexOf(worker)
210 this.workers.splice(workerIndex, 1)
211 this.tasks.delete(worker)
212 }
213
214 /**
215 * Choose a worker for the next task.
216 *
217 * The default implementation uses a round robin algorithm to distribute the load.
218 *
219 * @returns Worker.
220 */
221 protected chooseWorker (): Worker {
222 const chosenWorker = this.workers[this.nextWorkerIndex]
223 this.nextWorkerIndex =
224 this.workers.length - 1 === this.nextWorkerIndex
225 ? 0
226 : this.nextWorkerIndex + 1
227 return chosenWorker
228 }
229
230 /**
231 * Send a message to the given worker.
232 *
233 * @param worker The worker which should receive the message.
234 * @param message The message.
235 */
236 protected abstract sendToWorker (
237 worker: Worker,
238 message: MessageValue<Data>
239 ): void
240
241 protected abstract registerWorkerMessageListener (
242 port: Worker,
243 listener: (message: MessageValue<Response>) => void
244 ): void
245
246 protected abstract unregisterWorkerMessageListener (
247 port: Worker,
248 listener: (message: MessageValue<Response>) => void
249 ): void
250
251 protected internalExecute (
252 worker: Worker,
253 messageId: number
254 ): Promise<Response> {
255 return new Promise((resolve, reject) => {
256 const listener: (message: MessageValue<Response>) => void = message => {
257 if (message.id === messageId) {
258 this.unregisterWorkerMessageListener(worker, listener)
259 this.increaseWorkersTask(worker)
260 if (message.error) reject(message.error)
261 else resolve(message.data as Response)
262 }
263 }
264 this.registerWorkerMessageListener(worker, listener)
265 })
266 }
267
268 /**
269 * Returns a newly created worker.
270 */
271 protected abstract createWorker (): Worker
272
273 /**
274 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
275 *
276 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
277 *
278 * @param worker The newly created worker.
279 */
280 protected abstract afterWorkerSetup (worker: Worker): void
281
282 /**
283 * Creates a new worker for this pool and sets it up completely.
284 *
285 * @returns New, completely set up worker.
286 */
287 protected createAndSetupWorker (): Worker {
288 const worker: Worker = this.createWorker()
289
290 worker.on('error', this.opts.errorHandler ?? (() => {}))
291 worker.on('online', this.opts.onlineHandler ?? (() => {}))
292 worker.on('exit', this.opts.exitHandler ?? (() => {}))
293 worker.once('exit', () => this.removeWorker(worker))
294
295 this.workers.push(worker)
296
297 // Init tasks map
298 this.tasks.set(worker, 0)
299
300 this.afterWorkerSetup(worker)
301
302 return worker
303 }
304 }