Add benchmark for quick select algorithm evaluation. (#255)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
4a6952ff 5import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
a35560ba
S
6import type { IPoolInternal } from './pool-internal'
7import { PoolEmitter } from './pool-internal'
8import type { WorkerChoiceStrategy } from './selection-strategies'
9import {
10 WorkerChoiceStrategies,
11 WorkerChoiceStrategyContext
12} from './selection-strategies'
c97c7edb 13
c510fea7
APA
14/**
15 * An intentional empty function.
16 */
a35560ba
S
17const EMPTY_FUNCTION: () => void = () => {
18 /* Intentionally empty */
c510fea7
APA
19}
20
729c563d
S
21/**
22 * Callback invoked if the worker raised an error.
23 */
c97c7edb 24export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
25
26/**
27 * Callback invoked when the worker has started successfully.
28 */
c97c7edb 29export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
30
31/**
32 * Callback invoked when the worker exits successfully.
33 */
c97c7edb
S
34export type ExitHandler<Worker> = (this: Worker, code: number) => void
35
729c563d
S
36/**
37 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
38 */
c97c7edb 39export interface IWorker {
3832ad95
S
40 /**
41 * Register a listener to the error event.
42 *
43 * @param event `'error'`.
44 * @param handler The error handler.
45 */
c97c7edb 46 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
47 /**
48 * Register a listener to the online event.
49 *
50 * @param event `'online'`.
51 * @param handler The online handler.
52 */
c97c7edb 53 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
54 /**
55 * Register a listener to the exit event.
56 *
57 * @param event `'exit'`.
58 * @param handler The exit handler.
59 */
c97c7edb 60 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
61 /**
62 * Register a listener to the exit event that will only performed once.
63 *
64 * @param event `'exit'`.
65 * @param handler The exit handler.
66 */
45dbbb14 67 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
68}
69
729c563d
S
70/**
71 * Options for a poolifier pool.
72 */
c97c7edb
S
73export interface PoolOptions<Worker> {
74 /**
75 * A function that will listen for error event on each worker.
76 */
77 errorHandler?: ErrorHandler<Worker>
78 /**
79 * A function that will listen for online event on each worker.
80 */
81 onlineHandler?: OnlineHandler<Worker>
82 /**
83 * A function that will listen for exit event on each worker.
84 */
85 exitHandler?: ExitHandler<Worker>
a35560ba
S
86 /**
87 * The work choice strategy to use in this pool.
88 */
89 workerChoiceStrategy?: WorkerChoiceStrategy
c97c7edb
S
90}
91
729c563d
S
92/**
93 * Base class containing some shared logic for all poolifier pools.
94 *
95 * @template Worker Type of worker which manages this pool.
deb85c12
JB
96 * @template Data Type of data sent to the worker. This can only be serializable data.
97 * @template Response Type of response of execution. This can only be serializable data.
729c563d 98 */
c97c7edb
S
99export abstract class AbstractPool<
100 Worker extends IWorker,
d3c8a1a8
S
101 Data = unknown,
102 Response = unknown
a35560ba 103> implements IPoolInternal<Worker, Data, Response> {
4a6952ff
JB
104 /** @inheritdoc */
105 public readonly workers: Worker[] = []
106
107 /** @inheritdoc */
108 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
109
110 /** @inheritdoc */
111 public readonly emitter: PoolEmitter
112
be0676b3
APA
113 /**
114 * The promise map.
115 *
116 * - `key`: This is the message ID of each submitted task.
117 * - `value`: An object that contains the worker, the resolve function and the reject function.
118 *
119 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
120 */
121 protected promiseMap: Map<
122 number,
123 PromiseWorkerResponseWrapper<Worker, Response>
124 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
125
729c563d
S
126 /**
127 * ID of the next message.
128 */
280c2a77 129 protected nextMessageId: number = 0
c97c7edb 130
a35560ba
S
131 /**
132 * Worker choice strategy instance implementing the worker choice algorithm.
133 *
134 * Default to a strategy implementing a round robin algorithm.
135 */
136 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
137 Worker,
138 Data,
139 Response
140 >
141
729c563d
S
142 /**
143 * Constructs a new poolifier pool.
144 *
5c5a1fb7 145 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 146 * @param filePath Path to the worker-file.
1927ee67 147 * @param opts Options for the pool.
729c563d 148 */
c97c7edb 149 public constructor (
5c5a1fb7 150 public readonly numberOfWorkers: number,
c97c7edb 151 public readonly filePath: string,
1927ee67 152 public readonly opts: PoolOptions<Worker>
c97c7edb
S
153 ) {
154 if (!this.isMain()) {
155 throw new Error('Cannot start a pool from a worker!')
156 }
8d3782fa 157 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 158 this.checkFilePath(this.filePath)
c97c7edb
S
159 this.setupHook()
160
5c5a1fb7 161 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 162 this.createAndSetupWorker()
c97c7edb
S
163 }
164
165 this.emitter = new PoolEmitter()
a35560ba
S
166 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
167 this,
4a6952ff
JB
168 () => {
169 const workerCreated = this.createAndSetupWorker()
170 this.registerWorkerMessageListener(workerCreated, message => {
171 const tasksInProgress = this.tasks.get(workerCreated)
172 if (
173 isKillBehavior(KillBehaviors.HARD, message.kill) ||
174 tasksInProgress === 0
175 ) {
176 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
177 void this.destroyWorker(workerCreated)
178 }
179 })
180 return workerCreated
181 },
a35560ba
S
182 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
183 )
c97c7edb
S
184 }
185
a35560ba 186 private checkFilePath (filePath: string): void {
c510fea7
APA
187 if (!filePath) {
188 throw new Error('Please specify a file with a worker implementation')
189 }
190 }
191
8d3782fa
JB
192 private checkNumberOfWorkers (numberOfWorkers: number): void {
193 if (numberOfWorkers == null) {
194 throw new Error(
195 'Cannot instantiate a pool without specifying the number of workers'
196 )
197 } else if (!Number.isSafeInteger(numberOfWorkers)) {
198 throw new Error(
199 'Cannot instantiate a pool with a non integer number of workers'
200 )
201 } else if (numberOfWorkers < 0) {
202 throw new Error(
203 'Cannot instantiate a pool with a negative number of workers'
204 )
ff5e76e1 205 } else if (!this.dynamic && numberOfWorkers === 0) {
8d3782fa
JB
206 throw new Error('Cannot instantiate a fixed pool with no worker')
207 }
208 }
209
a35560ba 210 /** @inheritdoc */
ff5e76e1 211 public get dynamic (): boolean {
a35560ba
S
212 return false
213 }
214
215 /** @inheritdoc */
216 public setWorkerChoiceStrategy (
217 workerChoiceStrategy: WorkerChoiceStrategy
218 ): void {
b98ec2e6 219 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
220 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
221 workerChoiceStrategy
222 )
223 }
224
225 /** @inheritdoc */
280c2a77
S
226 public execute (data: Data): Promise<Response> {
227 // Configure worker to handle message with the specified task
228 const worker = this.chooseWorker()
229 this.increaseWorkersTask(worker)
230 const messageId = ++this.nextMessageId
231 const res = this.internalExecute(worker, messageId)
232 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
233 return res
234 }
c97c7edb 235
a35560ba 236 /** @inheritdoc */
c97c7edb 237 public async destroy (): Promise<void> {
45dbbb14 238 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
239 }
240
4a6952ff
JB
241 /**
242 * Shut down given worker.
243 *
244 * @param worker A worker within `workers`.
245 */
246 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 247
729c563d 248 /**
280c2a77
S
249 * Setup hook that can be overridden by a Poolifier pool implementation
250 * to run code before workers are created in the abstract constructor.
729c563d 251 */
280c2a77
S
252 protected setupHook (): void {
253 // Can be overridden
254 }
c97c7edb 255
729c563d 256 /**
280c2a77
S
257 * Should return whether the worker is the main worker or not.
258 */
259 protected abstract isMain (): boolean
260
261 /**
262 * Increase the number of tasks that the given workers has done.
729c563d 263 *
f416c098 264 * @param worker Worker whose tasks are increased.
729c563d 265 */
280c2a77 266 protected increaseWorkersTask (worker: Worker): void {
f416c098 267 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
268 }
269
c01733f1 270 /**
d63d3be3 271 * Decrease the number of tasks that the given workers has done.
c01733f1 272 *
f416c098 273 * @param worker Worker whose tasks are decreased.
c01733f1 274 */
275 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
276 this.stepWorkerNumberOfTasks(worker, -1)
277 }
278
279 /**
280 * Step the number of tasks that the given workers has done.
281 *
282 * @param worker Worker whose tasks are set.
283 * @param step Worker number of tasks step.
284 */
a35560ba 285 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 286 const numberOfTasksInProgress = this.tasks.get(worker)
287 if (numberOfTasksInProgress !== undefined) {
f416c098 288 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 289 } else {
290 throw Error('Worker could not be found in tasks map')
291 }
292 }
293
729c563d
S
294 /**
295 * Removes the given worker from the pool.
296 *
297 * @param worker Worker that will be removed.
298 */
f2fdaa86
JB
299 protected removeWorker (worker: Worker): void {
300 // Clean worker from data structure
301 const workerIndex = this.workers.indexOf(worker)
302 this.workers.splice(workerIndex, 1)
303 this.tasks.delete(worker)
304 }
305
280c2a77
S
306 /**
307 * Choose a worker for the next task.
308 *
309 * The default implementation uses a round robin algorithm to distribute the load.
310 *
311 * @returns Worker.
312 */
313 protected chooseWorker (): Worker {
a35560ba 314 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
315 }
316
280c2a77
S
317 /**
318 * Send a message to the given worker.
319 *
320 * @param worker The worker which should receive the message.
321 * @param message The message.
322 */
323 protected abstract sendToWorker (
324 worker: Worker,
325 message: MessageValue<Data>
326 ): void
327
4a6952ff
JB
328 /**
329 * Register a listener callback on a given worker.
330 *
331 * @param worker A worker.
332 * @param listener A message listener callback.
333 */
334 protected abstract registerWorkerMessageListener<
4f7fa42a
S
335 Message extends Data | Response
336 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 337
280c2a77
S
338 protected internalExecute (
339 worker: Worker,
340 messageId: number
341 ): Promise<Response> {
be0676b3
APA
342 return new Promise<Response>((resolve, reject) => {
343 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
344 })
345 }
346
729c563d
S
347 /**
348 * Returns a newly created worker.
349 */
280c2a77 350 protected abstract createWorker (): Worker
c97c7edb 351
729c563d
S
352 /**
353 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
354 *
355 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
356 *
357 * @param worker The newly created worker.
358 */
280c2a77 359 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 360
4a6952ff
JB
361 /**
362 * Creates a new worker for this pool and sets it up completely.
363 *
364 * @returns New, completely set up worker.
365 */
366 protected createAndSetupWorker (): Worker {
280c2a77
S
367 const worker: Worker = this.createWorker()
368
a35560ba
S
369 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
370 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
371 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 372 worker.once('exit', () => this.removeWorker(worker))
280c2a77 373
c97c7edb 374 this.workers.push(worker)
280c2a77
S
375
376 // Init tasks map
c97c7edb 377 this.tasks.set(worker, 0)
280c2a77
S
378
379 this.afterWorkerSetup(worker)
380
c97c7edb
S
381 return worker
382 }
be0676b3
APA
383
384 /**
385 * This function is the listener registered for each worker.
386 *
387 * @returns The listener function to execute when a message is sent from a worker.
388 */
389 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 390 return message => {
be0676b3
APA
391 if (message.id) {
392 const value = this.promiseMap.get(message.id)
393 if (value) {
394 this.decreaseWorkersTasks(value.worker)
395 if (message.error) value.reject(message.error)
396 else value.resolve(message.data as Response)
397 this.promiseMap.delete(message.id)
398 }
399 }
400 }
be0676b3 401 }
c97c7edb 402}