Extract selection strategies to classes (#176)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb 1import type { MessageValue } from '../utility-types'
a35560ba
S
2import type { IPoolInternal } from './pool-internal'
3import { PoolEmitter } from './pool-internal'
4import type { WorkerChoiceStrategy } from './selection-strategies'
5import {
6 WorkerChoiceStrategies,
7 WorkerChoiceStrategyContext
8} from './selection-strategies'
c97c7edb 9
c510fea7
APA
10/**
11 * An intentional empty function.
12 */
a35560ba
S
13const EMPTY_FUNCTION: () => void = () => {
14 /* Intentionally empty */
c510fea7
APA
15}
16
729c563d
S
17/**
18 * Callback invoked if the worker raised an error.
19 */
c97c7edb 20export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
21
22/**
23 * Callback invoked when the worker has started successfully.
24 */
c97c7edb 25export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
26
27/**
28 * Callback invoked when the worker exits successfully.
29 */
c97c7edb
S
30export type ExitHandler<Worker> = (this: Worker, code: number) => void
31
729c563d
S
32/**
33 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
34 */
c97c7edb 35export interface IWorker {
3832ad95
S
36 /**
37 * Register a listener to the error event.
38 *
39 * @param event `'error'`.
40 * @param handler The error handler.
41 */
c97c7edb 42 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
43 /**
44 * Register a listener to the online event.
45 *
46 * @param event `'online'`.
47 * @param handler The online handler.
48 */
c97c7edb 49 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
50 /**
51 * Register a listener to the exit event.
52 *
53 * @param event `'exit'`.
54 * @param handler The exit handler.
55 */
c97c7edb 56 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
57 /**
58 * Register a listener to the exit event that will only performed once.
59 *
60 * @param event `'exit'`.
61 * @param handler The exit handler.
62 */
45dbbb14 63 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
64}
65
729c563d
S
66/**
67 * Options for a poolifier pool.
68 */
c97c7edb
S
69export interface PoolOptions<Worker> {
70 /**
71 * A function that will listen for error event on each worker.
72 */
73 errorHandler?: ErrorHandler<Worker>
74 /**
75 * A function that will listen for online event on each worker.
76 */
77 onlineHandler?: OnlineHandler<Worker>
78 /**
79 * A function that will listen for exit event on each worker.
80 */
81 exitHandler?: ExitHandler<Worker>
82 /**
729c563d
S
83 * This is just to avoid non-useful warning messages.
84 *
85 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
c97c7edb
S
86 *
87 * @default 1000
729c563d 88 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
c97c7edb
S
89 */
90 maxTasks?: number
a35560ba
S
91 /**
92 * The work choice strategy to use in this pool.
93 */
94 workerChoiceStrategy?: WorkerChoiceStrategy
c97c7edb
S
95}
96
729c563d
S
97/**
98 * Base class containing some shared logic for all poolifier pools.
99 *
100 * @template Worker Type of worker which manages this pool.
deb85c12
JB
101 * @template Data Type of data sent to the worker. This can only be serializable data.
102 * @template Response Type of response of execution. This can only be serializable data.
729c563d 103 */
c97c7edb
S
104export abstract class AbstractPool<
105 Worker extends IWorker,
d3c8a1a8
S
106 Data = unknown,
107 Response = unknown
a35560ba
S
108> implements IPoolInternal<Worker, Data, Response> {
109 /** @inheritdoc */
c97c7edb 110 public readonly workers: Worker[] = []
729c563d 111
a35560ba 112 /** @inheritdoc */
c97c7edb
S
113 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
114
a35560ba 115 /** @inheritdoc */
c97c7edb
S
116 public readonly emitter: PoolEmitter
117
729c563d
S
118 /**
119 * ID of the next message.
120 */
280c2a77 121 protected nextMessageId: number = 0
c97c7edb 122
a35560ba
S
123 /**
124 * Worker choice strategy instance implementing the worker choice algorithm.
125 *
126 * Default to a strategy implementing a round robin algorithm.
127 */
128 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
129 Worker,
130 Data,
131 Response
132 >
133
729c563d
S
134 /**
135 * Constructs a new poolifier pool.
136 *
5c5a1fb7 137 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d
S
138 * @param filePath Path to the worker-file.
139 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
140 */
c97c7edb 141 public constructor (
5c5a1fb7 142 public readonly numberOfWorkers: number,
c97c7edb
S
143 public readonly filePath: string,
144 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
145 ) {
146 if (!this.isMain()) {
147 throw new Error('Cannot start a pool from a worker!')
148 }
c510fea7 149 this.checkFilePath(this.filePath)
c97c7edb
S
150 this.setupHook()
151
5c5a1fb7 152 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 153 this.createAndSetupWorker()
c97c7edb
S
154 }
155
156 this.emitter = new PoolEmitter()
a35560ba
S
157 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
158 this,
159 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
160 )
c97c7edb
S
161 }
162
a35560ba 163 private checkFilePath (filePath: string): void {
c510fea7
APA
164 if (!filePath) {
165 throw new Error('Please specify a file with a worker implementation')
166 }
167 }
168
a35560ba
S
169 /** @inheritdoc */
170 public isDynamic (): boolean {
171 return false
172 }
173
174 /** @inheritdoc */
175 public setWorkerChoiceStrategy (
176 workerChoiceStrategy: WorkerChoiceStrategy
177 ): void {
178 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
179 workerChoiceStrategy
180 )
181 }
182
183 /** @inheritdoc */
280c2a77
S
184 public execute (data: Data): Promise<Response> {
185 // Configure worker to handle message with the specified task
186 const worker = this.chooseWorker()
187 this.increaseWorkersTask(worker)
188 const messageId = ++this.nextMessageId
189 const res = this.internalExecute(worker, messageId)
190 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
191 return res
192 }
c97c7edb 193
a35560ba 194 /** @inheritdoc */
c97c7edb 195 public async destroy (): Promise<void> {
45dbbb14 196 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
197 }
198
a35560ba
S
199 /** @inheritdoc */
200 public abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 201
729c563d 202 /**
280c2a77
S
203 * Setup hook that can be overridden by a Poolifier pool implementation
204 * to run code before workers are created in the abstract constructor.
729c563d 205 */
280c2a77
S
206 protected setupHook (): void {
207 // Can be overridden
208 }
c97c7edb 209
729c563d 210 /**
280c2a77
S
211 * Should return whether the worker is the main worker or not.
212 */
213 protected abstract isMain (): boolean
214
215 /**
216 * Increase the number of tasks that the given workers has done.
729c563d 217 *
f416c098 218 * @param worker Worker whose tasks are increased.
729c563d 219 */
280c2a77 220 protected increaseWorkersTask (worker: Worker): void {
f416c098 221 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
222 }
223
c01733f1 224 /**
d63d3be3 225 * Decrease the number of tasks that the given workers has done.
c01733f1 226 *
f416c098 227 * @param worker Worker whose tasks are decreased.
c01733f1 228 */
229 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
230 this.stepWorkerNumberOfTasks(worker, -1)
231 }
232
233 /**
234 * Step the number of tasks that the given workers has done.
235 *
236 * @param worker Worker whose tasks are set.
237 * @param step Worker number of tasks step.
238 */
a35560ba 239 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 240 const numberOfTasksInProgress = this.tasks.get(worker)
241 if (numberOfTasksInProgress !== undefined) {
f416c098 242 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 243 } else {
244 throw Error('Worker could not be found in tasks map')
245 }
246 }
247
729c563d
S
248 /**
249 * Removes the given worker from the pool.
250 *
251 * @param worker Worker that will be removed.
252 */
f2fdaa86
JB
253 protected removeWorker (worker: Worker): void {
254 // Clean worker from data structure
255 const workerIndex = this.workers.indexOf(worker)
256 this.workers.splice(workerIndex, 1)
257 this.tasks.delete(worker)
258 }
259
280c2a77
S
260 /**
261 * Choose a worker for the next task.
262 *
263 * The default implementation uses a round robin algorithm to distribute the load.
264 *
265 * @returns Worker.
266 */
267 protected chooseWorker (): Worker {
a35560ba 268 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
269 }
270
280c2a77
S
271 /**
272 * Send a message to the given worker.
273 *
274 * @param worker The worker which should receive the message.
275 * @param message The message.
276 */
277 protected abstract sendToWorker (
278 worker: Worker,
279 message: MessageValue<Data>
280 ): void
281
a35560ba
S
282 /** @inheritdoc */
283 public abstract registerWorkerMessageListener<
4f7fa42a
S
284 Message extends Data | Response
285 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 286
4f7fa42a
S
287 protected abstract unregisterWorkerMessageListener<
288 Message extends Data | Response
289 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 290
280c2a77
S
291 protected internalExecute (
292 worker: Worker,
293 messageId: number
294 ): Promise<Response> {
c97c7edb
S
295 return new Promise((resolve, reject) => {
296 const listener: (message: MessageValue<Response>) => void = message => {
280c2a77 297 if (message.id === messageId) {
c97c7edb 298 this.unregisterWorkerMessageListener(worker, listener)
c01733f1 299 this.decreaseWorkersTasks(worker)
c97c7edb
S
300 if (message.error) reject(message.error)
301 else resolve(message.data as Response)
302 }
303 }
304 this.registerWorkerMessageListener(worker, listener)
305 })
306 }
307
729c563d
S
308 /**
309 * Returns a newly created worker.
310 */
280c2a77 311 protected abstract createWorker (): Worker
c97c7edb 312
729c563d
S
313 /**
314 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
315 *
316 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
317 *
318 * @param worker The newly created worker.
319 */
280c2a77 320 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 321
a35560ba
S
322 /** @inheritdoc */
323 public createAndSetupWorker (): Worker {
280c2a77
S
324 const worker: Worker = this.createWorker()
325
a35560ba
S
326 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
327 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
328 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 329 worker.once('exit', () => this.removeWorker(worker))
280c2a77 330
c97c7edb 331 this.workers.push(worker)
280c2a77
S
332
333 // Init tasks map
c97c7edb 334 this.tasks.set(worker, 0)
280c2a77
S
335
336 this.afterWorkerSetup(worker)
337
c97c7edb
S
338 return worker
339 }
340}