Merge branch 'master' into issue-70
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb
S
1import EventEmitter from 'events'
2import type { MessageValue } from '../utility-types'
3import type { IPool } from './pool'
4
729c563d
S
5/**
6 * Callback invoked if the worker raised an error.
7 */
c97c7edb 8export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
9
10/**
11 * Callback invoked when the worker has started successfully.
12 */
c97c7edb 13export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
14
15/**
16 * Callback invoked when the worker exits successfully.
17 */
c97c7edb
S
18export type ExitHandler<Worker> = (this: Worker, code: number) => void
19
729c563d
S
20/**
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
22 */
c97c7edb
S
23export interface IWorker {
24 on(event: 'error', handler: ErrorHandler<this>): void
25 on(event: 'online', handler: OnlineHandler<this>): void
26 on(event: 'exit', handler: ExitHandler<this>): void
45dbbb14 27 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
28}
29
729c563d
S
30/**
31 * Options for a poolifier pool.
32 */
c97c7edb
S
33export interface PoolOptions<Worker> {
34 /**
35 * A function that will listen for error event on each worker.
36 */
37 errorHandler?: ErrorHandler<Worker>
38 /**
39 * A function that will listen for online event on each worker.
40 */
41 onlineHandler?: OnlineHandler<Worker>
42 /**
43 * A function that will listen for exit event on each worker.
44 */
45 exitHandler?: ExitHandler<Worker>
46 /**
729c563d
S
47 * This is just to avoid non-useful warning messages.
48 *
49 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
c97c7edb
S
50 *
51 * @default 1000
729c563d 52 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
c97c7edb
S
53 */
54 maxTasks?: number
55}
56
729c563d
S
57/**
58 * Internal poolifier pool emitter.
59 */
c97c7edb
S
60class PoolEmitter extends EventEmitter {}
61
729c563d
S
62/**
63 * Base class containing some shared logic for all poolifier pools.
64 *
65 * @template Worker Type of worker which manages this pool.
66 * @template Data Type of data sent to the worker.
67 * @template Response Type of response of execution.
68 */
c97c7edb
S
69export abstract class AbstractPool<
70 Worker extends IWorker,
d3c8a1a8
S
71 Data = unknown,
72 Response = unknown
c97c7edb 73> implements IPool<Data, Response> {
729c563d
S
74 /**
75 * List of currently available workers.
76 */
c97c7edb 77 public readonly workers: Worker[] = []
729c563d
S
78
79 /**
280c2a77 80 * Index for the next worker.
729c563d 81 */
280c2a77 82 public nextWorkerIndex: number = 0
c97c7edb
S
83
84 /**
729c563d 85 * - `key`: The `Worker`
c01733f1 86 * - `value`: Number of tasks currently in progress on the worker.
c97c7edb
S
87 */
88 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
89
729c563d
S
90 /**
91 * Emitter on which events can be listened to.
92 *
93 * Events that can currently be listened to:
94 *
95 * - `'FullPool'`
96 */
c97c7edb
S
97 public readonly emitter: PoolEmitter
98
729c563d
S
99 /**
100 * ID of the next message.
101 */
280c2a77 102 protected nextMessageId: number = 0
c97c7edb 103
729c563d
S
104 /**
105 * Constructs a new poolifier pool.
106 *
5c5a1fb7 107 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d
S
108 * @param filePath Path to the worker-file.
109 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
110 */
c97c7edb 111 public constructor (
5c5a1fb7 112 public readonly numberOfWorkers: number,
c97c7edb
S
113 public readonly filePath: string,
114 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
115 ) {
116 if (!this.isMain()) {
117 throw new Error('Cannot start a pool from a worker!')
118 }
119 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
120 if (!this.filePath) {
121 throw new Error('Please specify a file with a worker implementation')
122 }
c97c7edb
S
123 this.setupHook()
124
5c5a1fb7 125 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 126 this.createAndSetupWorker()
c97c7edb
S
127 }
128
129 this.emitter = new PoolEmitter()
130 }
131
729c563d 132 /**
5c5a1fb7
S
133 * Number of workers that this pool should manage.
134 *
135 * @returns Number of workers that this pool manages.
136 * @deprecated Only here for backward compatibility.
137 */
138 // eslint-disable-next-line spellcheck/spell-checker
139 public get numWorkers (): number {
140 return this.numberOfWorkers
141 }
142
143 /**
280c2a77
S
144 * Index for the next worker.
145 *
146 * @returns Index for the next worker.
147 * @deprecated Only here for backward compatibility.
729c563d 148 */
280c2a77
S
149 public get nextWorker (): number {
150 return this.nextWorkerIndex
c97c7edb
S
151 }
152
280c2a77
S
153 public execute (data: Data): Promise<Response> {
154 // Configure worker to handle message with the specified task
155 const worker = this.chooseWorker()
156 this.increaseWorkersTask(worker)
157 const messageId = ++this.nextMessageId
158 const res = this.internalExecute(worker, messageId)
159 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
160 return res
161 }
c97c7edb
S
162
163 public async destroy (): Promise<void> {
45dbbb14 164 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
165 }
166
729c563d
S
167 /**
168 * Shut down given worker.
169 *
170 * @param worker A worker within `workers`.
171 */
c97c7edb
S
172 protected abstract destroyWorker (worker: Worker): void | Promise<void>
173
729c563d 174 /**
280c2a77
S
175 * Setup hook that can be overridden by a Poolifier pool implementation
176 * to run code before workers are created in the abstract constructor.
729c563d 177 */
280c2a77
S
178 protected setupHook (): void {
179 // Can be overridden
180 }
c97c7edb 181
729c563d 182 /**
280c2a77
S
183 * Should return whether the worker is the main worker or not.
184 */
185 protected abstract isMain (): boolean
186
187 /**
188 * Increase the number of tasks that the given workers has done.
729c563d 189 *
280c2a77 190 * @param worker Workers whose tasks are increased.
729c563d 191 */
280c2a77
S
192 protected increaseWorkersTask (worker: Worker): void {
193 const numberOfTasksTheWorkerHas = this.tasks.get(worker)
194 if (numberOfTasksTheWorkerHas !== undefined) {
195 this.tasks.set(worker, numberOfTasksTheWorkerHas + 1)
c97c7edb
S
196 } else {
197 throw Error('Worker could not be found in tasks map')
198 }
199 }
200
c01733f1 201 /**
202 * Increase the number of tasks that the given workers has done.
203 *
204 * @param worker Workers whose tasks are increased.
205 */
206 protected decreaseWorkersTasks (worker: Worker): void {
207 const numberOfTasksTheWorkerHas = this.tasks.get(worker)
208 if (numberOfTasksTheWorkerHas !== undefined) {
209 this.tasks.set(worker, numberOfTasksTheWorkerHas - 1)
210 } else {
211 throw Error('Worker could not be found in tasks map')
212 }
213 }
214
729c563d
S
215 /**
216 * Removes the given worker from the pool.
217 *
218 * @param worker Worker that will be removed.
219 */
f2fdaa86
JB
220 protected removeWorker (worker: Worker): void {
221 // Clean worker from data structure
222 const workerIndex = this.workers.indexOf(worker)
223 this.workers.splice(workerIndex, 1)
224 this.tasks.delete(worker)
225 }
226
280c2a77
S
227 /**
228 * Choose a worker for the next task.
229 *
230 * The default implementation uses a round robin algorithm to distribute the load.
231 *
232 * @returns Worker.
233 */
234 protected chooseWorker (): Worker {
235 const chosenWorker = this.workers[this.nextWorkerIndex]
aacd8188
S
236 this.nextWorkerIndex =
237 this.workers.length - 1 === this.nextWorkerIndex
238 ? 0
239 : this.nextWorkerIndex + 1
280c2a77 240 return chosenWorker
c97c7edb
S
241 }
242
280c2a77
S
243 /**
244 * Send a message to the given worker.
245 *
246 * @param worker The worker which should receive the message.
247 * @param message The message.
248 */
249 protected abstract sendToWorker (
250 worker: Worker,
251 message: MessageValue<Data>
252 ): void
253
4f7fa42a
S
254 protected abstract registerWorkerMessageListener<
255 Message extends Data | Response
256 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 257
4f7fa42a
S
258 protected abstract unregisterWorkerMessageListener<
259 Message extends Data | Response
260 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 261
280c2a77
S
262 protected internalExecute (
263 worker: Worker,
264 messageId: number
265 ): Promise<Response> {
c97c7edb
S
266 return new Promise((resolve, reject) => {
267 const listener: (message: MessageValue<Response>) => void = message => {
280c2a77 268 if (message.id === messageId) {
c97c7edb 269 this.unregisterWorkerMessageListener(worker, listener)
c01733f1 270 this.decreaseWorkersTasks(worker)
c97c7edb
S
271 if (message.error) reject(message.error)
272 else resolve(message.data as Response)
273 }
274 }
275 this.registerWorkerMessageListener(worker, listener)
276 })
277 }
278
729c563d
S
279 /**
280 * Returns a newly created worker.
281 */
280c2a77 282 protected abstract createWorker (): Worker
c97c7edb 283
729c563d
S
284 /**
285 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
286 *
287 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
288 *
289 * @param worker The newly created worker.
290 */
280c2a77 291 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 292
729c563d
S
293 /**
294 * Creates a new worker for this pool and sets it up completely.
50eceb07
S
295 *
296 * @returns New, completely set up worker.
729c563d 297 */
280c2a77
S
298 protected createAndSetupWorker (): Worker {
299 const worker: Worker = this.createWorker()
300
c97c7edb
S
301 worker.on('error', this.opts.errorHandler ?? (() => {}))
302 worker.on('online', this.opts.onlineHandler ?? (() => {}))
c97c7edb 303 worker.on('exit', this.opts.exitHandler ?? (() => {}))
45dbbb14 304 worker.once('exit', () => this.removeWorker(worker))
280c2a77 305
c97c7edb 306 this.workers.push(worker)
280c2a77
S
307
308 // Init tasks map
c97c7edb 309 this.tasks.set(worker, 0)
280c2a77
S
310
311 this.afterWorkerSetup(worker)
312
c97c7edb
S
313 return worker
314 }
315}