New bench execution and sleep between a test and another
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
a35560ba
S
5import type { IPoolInternal } from './pool-internal'
6import { PoolEmitter } from './pool-internal'
7import type { WorkerChoiceStrategy } from './selection-strategies'
8import {
9 WorkerChoiceStrategies,
10 WorkerChoiceStrategyContext
11} from './selection-strategies'
c97c7edb 12
c510fea7
APA
13/**
14 * An intentional empty function.
15 */
a35560ba
S
16const EMPTY_FUNCTION: () => void = () => {
17 /* Intentionally empty */
c510fea7
APA
18}
19
729c563d
S
20/**
21 * Callback invoked if the worker raised an error.
22 */
c97c7edb 23export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
24
25/**
26 * Callback invoked when the worker has started successfully.
27 */
c97c7edb 28export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
29
30/**
31 * Callback invoked when the worker exits successfully.
32 */
c97c7edb
S
33export type ExitHandler<Worker> = (this: Worker, code: number) => void
34
729c563d
S
35/**
36 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
37 */
c97c7edb 38export interface IWorker {
3832ad95
S
39 /**
40 * Register a listener to the error event.
41 *
42 * @param event `'error'`.
43 * @param handler The error handler.
44 */
c97c7edb 45 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
46 /**
47 * Register a listener to the online event.
48 *
49 * @param event `'online'`.
50 * @param handler The online handler.
51 */
c97c7edb 52 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
53 /**
54 * Register a listener to the exit event.
55 *
56 * @param event `'exit'`.
57 * @param handler The exit handler.
58 */
c97c7edb 59 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
60 /**
61 * Register a listener to the exit event that will only performed once.
62 *
63 * @param event `'exit'`.
64 * @param handler The exit handler.
65 */
45dbbb14 66 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
67}
68
729c563d
S
69/**
70 * Options for a poolifier pool.
71 */
c97c7edb
S
72export interface PoolOptions<Worker> {
73 /**
74 * A function that will listen for error event on each worker.
75 */
76 errorHandler?: ErrorHandler<Worker>
77 /**
78 * A function that will listen for online event on each worker.
79 */
80 onlineHandler?: OnlineHandler<Worker>
81 /**
82 * A function that will listen for exit event on each worker.
83 */
84 exitHandler?: ExitHandler<Worker>
85 /**
729c563d
S
86 * This is just to avoid non-useful warning messages.
87 *
88 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
c97c7edb
S
89 *
90 * @default 1000
729c563d 91 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
c97c7edb
S
92 */
93 maxTasks?: number
a35560ba
S
94 /**
95 * The work choice strategy to use in this pool.
96 */
97 workerChoiceStrategy?: WorkerChoiceStrategy
c97c7edb
S
98}
99
729c563d
S
100/**
101 * Base class containing some shared logic for all poolifier pools.
102 *
103 * @template Worker Type of worker which manages this pool.
deb85c12
JB
104 * @template Data Type of data sent to the worker. This can only be serializable data.
105 * @template Response Type of response of execution. This can only be serializable data.
729c563d 106 */
c97c7edb
S
107export abstract class AbstractPool<
108 Worker extends IWorker,
d3c8a1a8
S
109 Data = unknown,
110 Response = unknown
a35560ba 111> implements IPoolInternal<Worker, Data, Response> {
be0676b3
APA
112 /**
113 * The promise map.
114 *
115 * - `key`: This is the message ID of each submitted task.
116 * - `value`: An object that contains the worker, the resolve function and the reject function.
117 *
118 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
119 */
120 protected promiseMap: Map<
121 number,
122 PromiseWorkerResponseWrapper<Worker, Response>
123 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
124
a35560ba 125 /** @inheritdoc */
c97c7edb 126 public readonly workers: Worker[] = []
729c563d 127
a35560ba 128 /** @inheritdoc */
c97c7edb
S
129 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
130
a35560ba 131 /** @inheritdoc */
c97c7edb
S
132 public readonly emitter: PoolEmitter
133
729c563d
S
134 /**
135 * ID of the next message.
136 */
280c2a77 137 protected nextMessageId: number = 0
c97c7edb 138
a35560ba
S
139 /**
140 * Worker choice strategy instance implementing the worker choice algorithm.
141 *
142 * Default to a strategy implementing a round robin algorithm.
143 */
144 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
145 Worker,
146 Data,
147 Response
148 >
149
729c563d
S
150 /**
151 * Constructs a new poolifier pool.
152 *
5c5a1fb7 153 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d
S
154 * @param filePath Path to the worker-file.
155 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
156 */
c97c7edb 157 public constructor (
5c5a1fb7 158 public readonly numberOfWorkers: number,
c97c7edb
S
159 public readonly filePath: string,
160 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
161 ) {
162 if (!this.isMain()) {
163 throw new Error('Cannot start a pool from a worker!')
164 }
c510fea7 165 this.checkFilePath(this.filePath)
c97c7edb
S
166 this.setupHook()
167
5c5a1fb7 168 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 169 this.createAndSetupWorker()
c97c7edb
S
170 }
171
172 this.emitter = new PoolEmitter()
a35560ba
S
173 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
174 this,
175 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
176 )
c97c7edb
S
177 }
178
a35560ba 179 private checkFilePath (filePath: string): void {
c510fea7
APA
180 if (!filePath) {
181 throw new Error('Please specify a file with a worker implementation')
182 }
183 }
184
a35560ba
S
185 /** @inheritdoc */
186 public isDynamic (): boolean {
187 return false
188 }
189
190 /** @inheritdoc */
191 public setWorkerChoiceStrategy (
192 workerChoiceStrategy: WorkerChoiceStrategy
193 ): void {
194 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
195 workerChoiceStrategy
196 )
197 }
198
199 /** @inheritdoc */
280c2a77
S
200 public execute (data: Data): Promise<Response> {
201 // Configure worker to handle message with the specified task
202 const worker = this.chooseWorker()
203 this.increaseWorkersTask(worker)
204 const messageId = ++this.nextMessageId
205 const res = this.internalExecute(worker, messageId)
206 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
207 return res
208 }
c97c7edb 209
a35560ba 210 /** @inheritdoc */
c97c7edb 211 public async destroy (): Promise<void> {
45dbbb14 212 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
213 }
214
a35560ba
S
215 /** @inheritdoc */
216 public abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 217
729c563d 218 /**
280c2a77
S
219 * Setup hook that can be overridden by a Poolifier pool implementation
220 * to run code before workers are created in the abstract constructor.
729c563d 221 */
280c2a77
S
222 protected setupHook (): void {
223 // Can be overridden
224 }
c97c7edb 225
729c563d 226 /**
280c2a77
S
227 * Should return whether the worker is the main worker or not.
228 */
229 protected abstract isMain (): boolean
230
231 /**
232 * Increase the number of tasks that the given workers has done.
729c563d 233 *
f416c098 234 * @param worker Worker whose tasks are increased.
729c563d 235 */
280c2a77 236 protected increaseWorkersTask (worker: Worker): void {
f416c098 237 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
238 }
239
c01733f1 240 /**
d63d3be3 241 * Decrease the number of tasks that the given workers has done.
c01733f1 242 *
f416c098 243 * @param worker Worker whose tasks are decreased.
c01733f1 244 */
245 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
246 this.stepWorkerNumberOfTasks(worker, -1)
247 }
248
249 /**
250 * Step the number of tasks that the given workers has done.
251 *
252 * @param worker Worker whose tasks are set.
253 * @param step Worker number of tasks step.
254 */
a35560ba 255 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 256 const numberOfTasksInProgress = this.tasks.get(worker)
257 if (numberOfTasksInProgress !== undefined) {
f416c098 258 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 259 } else {
260 throw Error('Worker could not be found in tasks map')
261 }
262 }
263
729c563d
S
264 /**
265 * Removes the given worker from the pool.
266 *
267 * @param worker Worker that will be removed.
268 */
f2fdaa86
JB
269 protected removeWorker (worker: Worker): void {
270 // Clean worker from data structure
271 const workerIndex = this.workers.indexOf(worker)
272 this.workers.splice(workerIndex, 1)
273 this.tasks.delete(worker)
274 }
275
280c2a77
S
276 /**
277 * Choose a worker for the next task.
278 *
279 * The default implementation uses a round robin algorithm to distribute the load.
280 *
281 * @returns Worker.
282 */
283 protected chooseWorker (): Worker {
a35560ba 284 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
285 }
286
280c2a77
S
287 /**
288 * Send a message to the given worker.
289 *
290 * @param worker The worker which should receive the message.
291 * @param message The message.
292 */
293 protected abstract sendToWorker (
294 worker: Worker,
295 message: MessageValue<Data>
296 ): void
297
a35560ba
S
298 /** @inheritdoc */
299 public abstract registerWorkerMessageListener<
4f7fa42a
S
300 Message extends Data | Response
301 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 302
280c2a77
S
303 protected internalExecute (
304 worker: Worker,
305 messageId: number
306 ): Promise<Response> {
be0676b3
APA
307 return new Promise<Response>((resolve, reject) => {
308 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
309 })
310 }
311
729c563d
S
312 /**
313 * Returns a newly created worker.
314 */
280c2a77 315 protected abstract createWorker (): Worker
c97c7edb 316
729c563d
S
317 /**
318 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
319 *
320 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
321 *
322 * @param worker The newly created worker.
323 */
280c2a77 324 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 325
a35560ba
S
326 /** @inheritdoc */
327 public createAndSetupWorker (): Worker {
280c2a77
S
328 const worker: Worker = this.createWorker()
329
a35560ba
S
330 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
331 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
332 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 333 worker.once('exit', () => this.removeWorker(worker))
280c2a77 334
c97c7edb 335 this.workers.push(worker)
280c2a77
S
336
337 // Init tasks map
c97c7edb 338 this.tasks.set(worker, 0)
280c2a77
S
339
340 this.afterWorkerSetup(worker)
341
c97c7edb
S
342 return worker
343 }
be0676b3
APA
344
345 /**
346 * This function is the listener registered for each worker.
347 *
348 * @returns The listener function to execute when a message is sent from a worker.
349 */
350 protected workerListener (): (message: MessageValue<Response>) => void {
351 const listener: (message: MessageValue<Response>) => void = message => {
352 if (message.id) {
353 const value = this.promiseMap.get(message.id)
354 if (value) {
355 this.decreaseWorkersTasks(value.worker)
356 if (message.error) value.reject(message.error)
357 else value.resolve(message.data as Response)
358 this.promiseMap.delete(message.id)
359 }
360 }
361 }
362 return listener
363 }
c97c7edb 364}