7944606d9c84dbcc7dc32a9547c8a6e425b48e01
[poolifier.git] / src / pools / abstract-pool.ts
1 import type { MessageValue } from '../utility-types'
2 import type { IPoolInternal } from './pool-internal'
3 import { PoolEmitter } from './pool-internal'
4 import type { WorkerChoiceStrategy } from './selection-strategies'
5 import {
6 WorkerChoiceStrategies,
7 WorkerChoiceStrategyContext
8 } from './selection-strategies'
9
10 /**
11 * An intentional empty function.
12 */
13 const EMPTY_FUNCTION: () => void = () => {
14 /* Intentionally empty */
15 }
16
17 /**
18 * Callback invoked if the worker raised an error.
19 */
20 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
21
22 /**
23 * Callback invoked when the worker has started successfully.
24 */
25 export type OnlineHandler<Worker> = (this: Worker) => void
26
27 /**
28 * Callback invoked when the worker exits successfully.
29 */
30 export type ExitHandler<Worker> = (this: Worker, code: number) => void
31
32 /**
33 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
34 */
35 export interface IWorker {
36 /**
37 * Register a listener to the error event.
38 *
39 * @param event `'error'`.
40 * @param handler The error handler.
41 */
42 on(event: 'error', handler: ErrorHandler<this>): void
43 /**
44 * Register a listener to the online event.
45 *
46 * @param event `'online'`.
47 * @param handler The online handler.
48 */
49 on(event: 'online', handler: OnlineHandler<this>): void
50 /**
51 * Register a listener to the exit event.
52 *
53 * @param event `'exit'`.
54 * @param handler The exit handler.
55 */
56 on(event: 'exit', handler: ExitHandler<this>): void
57 /**
58 * Register a listener to the exit event that will only performed once.
59 *
60 * @param event `'exit'`.
61 * @param handler The exit handler.
62 */
63 once(event: 'exit', handler: ExitHandler<this>): void
64 }
65
66 /**
67 * Options for a poolifier pool.
68 */
69 export interface PoolOptions<Worker> {
70 /**
71 * A function that will listen for error event on each worker.
72 */
73 errorHandler?: ErrorHandler<Worker>
74 /**
75 * A function that will listen for online event on each worker.
76 */
77 onlineHandler?: OnlineHandler<Worker>
78 /**
79 * A function that will listen for exit event on each worker.
80 */
81 exitHandler?: ExitHandler<Worker>
82 /**
83 * This is just to avoid non-useful warning messages.
84 *
85 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
86 *
87 * @default 1000
88 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
89 */
90 maxTasks?: number
91 /**
92 * The work choice strategy to use in this pool.
93 */
94 workerChoiceStrategy?: WorkerChoiceStrategy
95 }
96
97 /**
98 * Base class containing some shared logic for all poolifier pools.
99 *
100 * @template Worker Type of worker which manages this pool.
101 * @template Data Type of data sent to the worker. This can only be serializable data.
102 * @template Response Type of response of execution. This can only be serializable data.
103 */
104 export abstract class AbstractPool<
105 Worker extends IWorker,
106 Data = unknown,
107 Response = unknown
108 > implements IPoolInternal<Worker, Data, Response> {
109 /** @inheritdoc */
110 public readonly workers: Worker[] = []
111
112 /** @inheritdoc */
113 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
114
115 /** @inheritdoc */
116 public readonly emitter: PoolEmitter
117
118 /**
119 * ID of the next message.
120 */
121 protected nextMessageId: number = 0
122
123 /**
124 * Worker choice strategy instance implementing the worker choice algorithm.
125 *
126 * Default to a strategy implementing a round robin algorithm.
127 */
128 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
129 Worker,
130 Data,
131 Response
132 >
133
134 /**
135 * Constructs a new poolifier pool.
136 *
137 * @param numberOfWorkers Number of workers that this pool should manage.
138 * @param filePath Path to the worker-file.
139 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
140 */
141 public constructor (
142 public readonly numberOfWorkers: number,
143 public readonly filePath: string,
144 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
145 ) {
146 if (!this.isMain()) {
147 throw new Error('Cannot start a pool from a worker!')
148 }
149 this.checkFilePath(this.filePath)
150 this.setupHook()
151
152 for (let i = 1; i <= this.numberOfWorkers; i++) {
153 this.createAndSetupWorker()
154 }
155
156 this.emitter = new PoolEmitter()
157 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
158 this,
159 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
160 )
161 }
162
163 private checkFilePath (filePath: string): void {
164 if (!filePath) {
165 throw new Error('Please specify a file with a worker implementation')
166 }
167 }
168
169 /** @inheritdoc */
170 public isDynamic (): boolean {
171 return false
172 }
173
174 /** @inheritdoc */
175 public setWorkerChoiceStrategy (
176 workerChoiceStrategy: WorkerChoiceStrategy
177 ): void {
178 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
179 workerChoiceStrategy
180 )
181 }
182
183 /** @inheritdoc */
184 public execute (data: Data): Promise<Response> {
185 // Configure worker to handle message with the specified task
186 const worker = this.chooseWorker()
187 this.increaseWorkersTask(worker)
188 const messageId = ++this.nextMessageId
189 const res = this.internalExecute(worker, messageId)
190 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
191 return res
192 }
193
194 /** @inheritdoc */
195 public async destroy (): Promise<void> {
196 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
197 }
198
199 /** @inheritdoc */
200 public abstract destroyWorker (worker: Worker): void | Promise<void>
201
202 /**
203 * Setup hook that can be overridden by a Poolifier pool implementation
204 * to run code before workers are created in the abstract constructor.
205 */
206 protected setupHook (): void {
207 // Can be overridden
208 }
209
210 /**
211 * Should return whether the worker is the main worker or not.
212 */
213 protected abstract isMain (): boolean
214
215 /**
216 * Increase the number of tasks that the given workers has done.
217 *
218 * @param worker Worker whose tasks are increased.
219 */
220 protected increaseWorkersTask (worker: Worker): void {
221 this.stepWorkerNumberOfTasks(worker, 1)
222 }
223
224 /**
225 * Decrease the number of tasks that the given workers has done.
226 *
227 * @param worker Worker whose tasks are decreased.
228 */
229 protected decreaseWorkersTasks (worker: Worker): void {
230 this.stepWorkerNumberOfTasks(worker, -1)
231 }
232
233 /**
234 * Step the number of tasks that the given workers has done.
235 *
236 * @param worker Worker whose tasks are set.
237 * @param step Worker number of tasks step.
238 */
239 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
240 const numberOfTasksInProgress = this.tasks.get(worker)
241 if (numberOfTasksInProgress !== undefined) {
242 this.tasks.set(worker, numberOfTasksInProgress + step)
243 } else {
244 throw Error('Worker could not be found in tasks map')
245 }
246 }
247
248 /**
249 * Removes the given worker from the pool.
250 *
251 * @param worker Worker that will be removed.
252 */
253 protected removeWorker (worker: Worker): void {
254 // Clean worker from data structure
255 const workerIndex = this.workers.indexOf(worker)
256 this.workers.splice(workerIndex, 1)
257 this.tasks.delete(worker)
258 }
259
260 /**
261 * Choose a worker for the next task.
262 *
263 * The default implementation uses a round robin algorithm to distribute the load.
264 *
265 * @returns Worker.
266 */
267 protected chooseWorker (): Worker {
268 return this.workerChoiceStrategyContext.execute()
269 }
270
271 /**
272 * Send a message to the given worker.
273 *
274 * @param worker The worker which should receive the message.
275 * @param message The message.
276 */
277 protected abstract sendToWorker (
278 worker: Worker,
279 message: MessageValue<Data>
280 ): void
281
282 /** @inheritdoc */
283 public abstract registerWorkerMessageListener<
284 Message extends Data | Response
285 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
286
287 protected abstract unregisterWorkerMessageListener<
288 Message extends Data | Response
289 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
290
291 protected internalExecute (
292 worker: Worker,
293 messageId: number
294 ): Promise<Response> {
295 return new Promise((resolve, reject) => {
296 const listener: (message: MessageValue<Response>) => void = message => {
297 if (message.id === messageId) {
298 this.unregisterWorkerMessageListener(worker, listener)
299 this.decreaseWorkersTasks(worker)
300 if (message.error) reject(message.error)
301 else resolve(message.data as Response)
302 }
303 }
304 this.registerWorkerMessageListener(worker, listener)
305 })
306 }
307
308 /**
309 * Returns a newly created worker.
310 */
311 protected abstract createWorker (): Worker
312
313 /**
314 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
315 *
316 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
317 *
318 * @param worker The newly created worker.
319 */
320 protected abstract afterWorkerSetup (worker: Worker): void
321
322 /** @inheritdoc */
323 public createAndSetupWorker (): Worker {
324 const worker: Worker = this.createWorker()
325
326 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
327 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
328 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
329 worker.once('exit', () => this.removeWorker(worker))
330
331 this.workers.push(worker)
332
333 // Init tasks map
334 this.tasks.set(worker, 0)
335
336 this.afterWorkerSetup(worker)
337
338 return worker
339 }
340 }