Add eslint-plugin-spellcheck (#139)
[poolifier.git] / src / pools / abstract-pool.ts
1 import EventEmitter from 'events'
2 import type { MessageValue } from '../utility-types'
3 import type { IPool } from './pool'
4
5 /**
6 * Callback invoked if the worker raised an error.
7 */
8 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
9
10 /**
11 * Callback invoked when the worker has started successfully.
12 */
13 export type OnlineHandler<Worker> = (this: Worker) => void
14
15 /**
16 * Callback invoked when the worker exits successfully.
17 */
18 export type ExitHandler<Worker> = (this: Worker, code: number) => void
19
20 /**
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
22 */
23 export interface IWorker {
24 on(event: 'error', handler: ErrorHandler<this>): void
25 on(event: 'online', handler: OnlineHandler<this>): void
26 on(event: 'exit', handler: ExitHandler<this>): void
27 }
28
29 /**
30 * Options for a poolifier pool.
31 */
32 export interface PoolOptions<Worker> {
33 /**
34 * A function that will listen for error event on each worker.
35 */
36 errorHandler?: ErrorHandler<Worker>
37 /**
38 * A function that will listen for online event on each worker.
39 */
40 onlineHandler?: OnlineHandler<Worker>
41 /**
42 * A function that will listen for exit event on each worker.
43 */
44 exitHandler?: ExitHandler<Worker>
45 /**
46 * This is just to avoid non-useful warning messages.
47 *
48 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
49 *
50 * @default 1000
51 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
52 */
53 maxTasks?: number
54 }
55
56 /**
57 * Internal poolifier pool emitter.
58 */
59 class PoolEmitter extends EventEmitter {}
60
61 /**
62 * Base class containing some shared logic for all poolifier pools.
63 *
64 * @template Worker Type of worker which manages this pool.
65 * @template Data Type of data sent to the worker.
66 * @template Response Type of response of execution.
67 */
68 export abstract class AbstractPool<
69 Worker extends IWorker,
70 Data = unknown,
71 Response = unknown
72 > implements IPool<Data, Response> {
73 /**
74 * List of currently available workers.
75 */
76 public readonly workers: Worker[] = []
77
78 /**
79 * ID for the next worker.
80 */
81 public nextWorker: number = 0
82
83 /**
84 * - `key`: The `Worker`
85 * - `value`: Number of tasks that has been assigned to that worker since it started
86 */
87 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
88
89 /**
90 * Emitter on which events can be listened to.
91 *
92 * Events that can currently be listened to:
93 *
94 * - `'FullPool'`
95 */
96 public readonly emitter: PoolEmitter
97
98 /**
99 * ID of the next message.
100 */
101 protected id: number = 0
102
103 /**
104 * Constructs a new poolifier pool.
105 *
106 * @param numberOfWorkers Number of workers that this pool should manage.
107 * @param filePath Path to the worker-file.
108 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
109 */
110 public constructor (
111 public readonly numberOfWorkers: number,
112 public readonly filePath: string,
113 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
114 ) {
115 if (!this.isMain()) {
116 throw new Error('Cannot start a pool from a worker!')
117 }
118 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
119 if (!this.filePath) {
120 throw new Error('Please specify a file with a worker implementation')
121 }
122
123 this.setupHook()
124
125 for (let i = 1; i <= this.numberOfWorkers; i++) {
126 this.internalNewWorker()
127 }
128
129 this.emitter = new PoolEmitter()
130 }
131
132 /**
133 * Number of workers that this pool should manage.
134 *
135 * @returns Number of workers that this pool manages.
136 * @deprecated Only here for backward compatibility.
137 */
138 // eslint-disable-next-line spellcheck/spell-checker
139 public get numWorkers (): number {
140 return this.numberOfWorkers
141 }
142
143 /**
144 * Setup hook that can be overridden by a Poolifier pool implementation
145 * to run code before workers are created in the abstract constructor.
146 */
147 protected setupHook (): void {
148 // Can be overridden
149 }
150
151 /**
152 * Should return whether the worker is the main worker or not.
153 */
154 protected abstract isMain (): boolean
155
156 public async destroy (): Promise<void> {
157 for (const worker of this.workers) {
158 await this.destroyWorker(worker)
159 }
160 }
161
162 /**
163 * Shut down given worker.
164 *
165 * @param worker A worker within `workers`.
166 */
167 protected abstract destroyWorker (worker: Worker): void | Promise<void>
168
169 /**
170 * Send a message to the given worker.
171 *
172 * @param worker The worker which should receive the message.
173 * @param message The message.
174 */
175 protected abstract sendToWorker (
176 worker: Worker,
177 message: MessageValue<Data>
178 ): void
179
180 /**
181 * Adds the given worker to the pool.
182 *
183 * @param worker Worker that will be added.
184 */
185 protected addWorker (worker: Worker): void {
186 const previousWorkerIndex = this.tasks.get(worker)
187 if (previousWorkerIndex !== undefined) {
188 this.tasks.set(worker, previousWorkerIndex + 1)
189 } else {
190 throw Error('Worker could not be found in tasks map')
191 }
192 }
193
194 /**
195 * Removes the given worker from the pool.
196 *
197 * @param worker Worker that will be removed.
198 */
199 protected removeWorker (worker: Worker): void {
200 // Clean worker from data structure
201 const workerIndex = this.workers.indexOf(worker)
202 this.workers.splice(workerIndex, 1)
203 this.tasks.delete(worker)
204 }
205
206 public execute (data: Data): Promise<Response> {
207 // Configure worker to handle message with the specified task
208 const worker = this.chooseWorker()
209 this.addWorker(worker)
210 const id = ++this.id
211 const res = this.internalExecute(worker, id)
212 this.sendToWorker(worker, { data: data || ({} as Data), id: id })
213 return res
214 }
215
216 protected abstract registerWorkerMessageListener (
217 port: Worker,
218 listener: (message: MessageValue<Response>) => void
219 ): void
220
221 protected abstract unregisterWorkerMessageListener (
222 port: Worker,
223 listener: (message: MessageValue<Response>) => void
224 ): void
225
226 protected internalExecute (worker: Worker, id: number): Promise<Response> {
227 return new Promise((resolve, reject) => {
228 const listener: (message: MessageValue<Response>) => void = message => {
229 if (message.id === id) {
230 this.unregisterWorkerMessageListener(worker, listener)
231 this.addWorker(worker)
232 if (message.error) reject(message.error)
233 else resolve(message.data as Response)
234 }
235 }
236 this.registerWorkerMessageListener(worker, listener)
237 })
238 }
239
240 /**
241 * Choose a worker for the next task.
242 *
243 * The default implementation uses a round robin algorithm to distribute the load.
244 *
245 * @returns Worker.
246 */
247 protected chooseWorker (): Worker {
248 this.nextWorker =
249 this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1
250 return this.workers[this.nextWorker]
251 }
252
253 /**
254 * Returns a newly created worker.
255 */
256 protected abstract newWorker (): Worker
257
258 /**
259 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
260 *
261 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
262 *
263 * @param worker The newly created worker.
264 */
265 protected abstract afterNewWorkerPushed (worker: Worker): void
266
267 /**
268 * Creates a new worker for this pool and sets it up completely.
269 *
270 * @returns New, completely set up worker.
271 */
272 protected internalNewWorker (): Worker {
273 const worker: Worker = this.newWorker()
274 worker.on('error', this.opts.errorHandler ?? (() => {}))
275 worker.on('online', this.opts.onlineHandler ?? (() => {}))
276 // TODO handle properly when a worker exit
277 worker.on('exit', this.opts.exitHandler ?? (() => {}))
278 this.workers.push(worker)
279 this.afterNewWorkerPushed(worker)
280 // init tasks map
281 this.tasks.set(worker, 0)
282 return worker
283 }
284 }