Report some code cleanups from work in progress PR
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
a35560ba 7import type { IPoolInternal } from './pool-internal'
7c0ba920 8import { PoolEmitter, PoolType } from './pool-internal'
a35560ba
S
9import type { WorkerChoiceStrategy } from './selection-strategies'
10import {
11 WorkerChoiceStrategies,
12 WorkerChoiceStrategyContext
13} from './selection-strategies'
c97c7edb 14
35cf1c03
JB
15/**
16 * Callback invoked if the worker has received a message.
17 */
18export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
19
729c563d
S
20/**
21 * Callback invoked if the worker raised an error.
22 */
c97c7edb 23export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
24
25/**
26 * Callback invoked when the worker has started successfully.
27 */
c97c7edb 28export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
29
30/**
31 * Callback invoked when the worker exits successfully.
32 */
c97c7edb
S
33export type ExitHandler<Worker> = (this: Worker, code: number) => void
34
729c563d
S
35/**
36 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
37 */
c97c7edb 38export interface IWorker {
35cf1c03
JB
39 /**
40 * Register a listener to the message event.
41 *
42 * @param event `'message'`.
43 * @param handler The message handler.
44 */
45 on(event: 'message', handler: MessageHandler<this>): void
3832ad95
S
46 /**
47 * Register a listener to the error event.
48 *
49 * @param event `'error'`.
50 * @param handler The error handler.
51 */
c97c7edb 52 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
53 /**
54 * Register a listener to the online event.
55 *
56 * @param event `'online'`.
57 * @param handler The online handler.
58 */
c97c7edb 59 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
60 /**
61 * Register a listener to the exit event.
62 *
63 * @param event `'exit'`.
64 * @param handler The exit handler.
65 */
c97c7edb 66 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
67 /**
68 * Register a listener to the exit event that will only performed once.
69 *
70 * @param event `'exit'`.
71 * @param handler The exit handler.
72 */
45dbbb14 73 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
74}
75
729c563d
S
76/**
77 * Options for a poolifier pool.
78 */
c97c7edb 79export interface PoolOptions<Worker> {
35cf1c03
JB
80 /**
81 * A function that will listen for message event on each worker.
82 */
83 messageHandler?: MessageHandler<Worker>
c97c7edb
S
84 /**
85 * A function that will listen for error event on each worker.
86 */
87 errorHandler?: ErrorHandler<Worker>
88 /**
89 * A function that will listen for online event on each worker.
90 */
91 onlineHandler?: OnlineHandler<Worker>
92 /**
93 * A function that will listen for exit event on each worker.
94 */
95 exitHandler?: ExitHandler<Worker>
a35560ba
S
96 /**
97 * The work choice strategy to use in this pool.
98 */
99 workerChoiceStrategy?: WorkerChoiceStrategy
7c0ba920
JB
100 /**
101 * Pool events emission.
102 *
35cf1c03 103 * @default true
7c0ba920
JB
104 */
105 enableEvents?: boolean
c97c7edb
S
106}
107
729c563d
S
108/**
109 * Base class containing some shared logic for all poolifier pools.
110 *
111 * @template Worker Type of worker which manages this pool.
deb85c12
JB
112 * @template Data Type of data sent to the worker. This can only be serializable data.
113 * @template Response Type of response of execution. This can only be serializable data.
729c563d 114 */
c97c7edb
S
115export abstract class AbstractPool<
116 Worker extends IWorker,
d3c8a1a8
S
117 Data = unknown,
118 Response = unknown
9b2fdd9f 119> implements IPoolInternal<Worker, Data, Response> {
4a6952ff
JB
120 /** @inheritdoc */
121 public readonly workers: Worker[] = []
122
123 /** @inheritdoc */
124 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
125
126 /** @inheritdoc */
7c0ba920
JB
127 public readonly emitter?: PoolEmitter
128
129 /** @inheritdoc */
130 public readonly max?: number
4a6952ff 131
be0676b3
APA
132 /**
133 * The promise map.
134 *
e088a00c 135 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
136 * - `value`: An object that contains the worker, the resolve function and the reject function.
137 *
138 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
139 */
140 protected promiseMap: Map<
141 number,
142 PromiseWorkerResponseWrapper<Worker, Response>
143 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
144
729c563d 145 /**
e088a00c 146 * Id of the next message.
729c563d 147 */
280c2a77 148 protected nextMessageId: number = 0
c97c7edb 149
a35560ba
S
150 /**
151 * Worker choice strategy instance implementing the worker choice algorithm.
152 *
153 * Default to a strategy implementing a round robin algorithm.
154 */
155 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
156 Worker,
157 Data,
158 Response
159 >
160
729c563d
S
161 /**
162 * Constructs a new poolifier pool.
163 *
5c5a1fb7 164 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 165 * @param filePath Path to the worker-file.
1927ee67 166 * @param opts Options for the pool.
729c563d 167 */
c97c7edb 168 public constructor (
5c5a1fb7 169 public readonly numberOfWorkers: number,
c97c7edb 170 public readonly filePath: string,
1927ee67 171 public readonly opts: PoolOptions<Worker>
c97c7edb
S
172 ) {
173 if (!this.isMain()) {
174 throw new Error('Cannot start a pool from a worker!')
175 }
8d3782fa 176 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 177 this.checkFilePath(this.filePath)
7c0ba920 178 this.checkPoolOptions(this.opts)
c97c7edb
S
179 this.setupHook()
180
5c5a1fb7 181 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 182 this.createAndSetupWorker()
c97c7edb
S
183 }
184
7c0ba920
JB
185 if (this.opts.enableEvents) {
186 this.emitter = new PoolEmitter()
187 }
a35560ba
S
188 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
189 this,
4a6952ff
JB
190 () => {
191 const workerCreated = this.createAndSetupWorker()
f3636726 192 this.registerWorkerMessageListener(workerCreated, message => {
4a6952ff
JB
193 if (
194 isKillBehavior(KillBehaviors.HARD, message.kill) ||
bdacc2d2 195 this.tasks.get(workerCreated) === 0
4a6952ff
JB
196 ) {
197 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
f3636726 198 this.destroyWorker(workerCreated) as void
4a6952ff
JB
199 }
200 })
201 return workerCreated
202 },
e843b904 203 this.opts.workerChoiceStrategy
a35560ba 204 )
c97c7edb
S
205 }
206
a35560ba 207 private checkFilePath (filePath: string): void {
c510fea7
APA
208 if (!filePath) {
209 throw new Error('Please specify a file with a worker implementation')
210 }
211 }
212
8d3782fa
JB
213 private checkNumberOfWorkers (numberOfWorkers: number): void {
214 if (numberOfWorkers == null) {
215 throw new Error(
216 'Cannot instantiate a pool without specifying the number of workers'
217 )
218 } else if (!Number.isSafeInteger(numberOfWorkers)) {
219 throw new Error(
220 'Cannot instantiate a pool with a non integer number of workers'
221 )
222 } else if (numberOfWorkers < 0) {
223 throw new Error(
224 'Cannot instantiate a pool with a negative number of workers'
225 )
7c0ba920 226 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
227 throw new Error('Cannot instantiate a fixed pool with no worker')
228 }
229 }
230
7c0ba920 231 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
232 this.opts.workerChoiceStrategy =
233 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
234 this.opts.enableEvents = opts.enableEvents ?? true
235 }
236
a35560ba 237 /** @inheritdoc */
7c0ba920
JB
238 public abstract get type (): PoolType
239
240 /** @inheritdoc */
241 public get numberOfRunningTasks (): number {
242 return this.promiseMap.size
a35560ba
S
243 }
244
245 /** @inheritdoc */
246 public setWorkerChoiceStrategy (
247 workerChoiceStrategy: WorkerChoiceStrategy
248 ): void {
b98ec2e6 249 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
250 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
251 workerChoiceStrategy
252 )
253 }
254
7c0ba920
JB
255 /** @inheritdoc */
256 public abstract get busy (): boolean
257
258 protected internalGetBusyStatus (): boolean {
259 return (
260 this.numberOfRunningTasks >= this.numberOfWorkers &&
261 this.findFreeTasksMapEntry() === false
262 )
263 }
264
265 /** @inheritdoc */
266 public findFreeTasksMapEntry (): [Worker, number] | false {
267 for (const [worker, numberOfTasks] of this.tasks) {
268 if (numberOfTasks === 0) {
269 // A worker is free, return the matching tasks map entry
270 return [worker, numberOfTasks]
271 }
272 }
273 return false
274 }
275
a35560ba 276 /** @inheritdoc */
280c2a77
S
277 public execute (data: Data): Promise<Response> {
278 // Configure worker to handle message with the specified task
279 const worker = this.chooseWorker()
280c2a77
S
280 const messageId = ++this.nextMessageId
281 const res = this.internalExecute(worker, messageId)
14916bf9 282 this.checkAndEmitBusy()
bdacc2d2 283 this.sendToWorker(worker, { data: data ?? ({} as Data), id: messageId })
280c2a77
S
284 return res
285 }
c97c7edb 286
a35560ba 287 /** @inheritdoc */
c97c7edb 288 public async destroy (): Promise<void> {
45dbbb14 289 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
290 }
291
4a6952ff
JB
292 /**
293 * Shut down given worker.
294 *
295 * @param worker A worker within `workers`.
296 */
297 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 298
729c563d 299 /**
280c2a77
S
300 * Setup hook that can be overridden by a Poolifier pool implementation
301 * to run code before workers are created in the abstract constructor.
729c563d 302 */
280c2a77
S
303 protected setupHook (): void {
304 // Can be overridden
305 }
c97c7edb 306
729c563d 307 /**
280c2a77
S
308 * Should return whether the worker is the main worker or not.
309 */
310 protected abstract isMain (): boolean
311
312 /**
35cf1c03 313 * Increase the number of tasks that the given worker has applied.
729c563d 314 *
f416c098 315 * @param worker Worker whose tasks are increased.
729c563d 316 */
280c2a77 317 protected increaseWorkersTask (worker: Worker): void {
f416c098 318 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
319 }
320
c01733f1 321 /**
35cf1c03 322 * Decrease the number of tasks that the given worker has applied.
c01733f1 323 *
f416c098 324 * @param worker Worker whose tasks are decreased.
c01733f1 325 */
326 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
327 this.stepWorkerNumberOfTasks(worker, -1)
328 }
329
330 /**
35cf1c03 331 * Step the number of tasks that the given worker has applied.
f416c098
JB
332 *
333 * @param worker Worker whose tasks are set.
334 * @param step Worker number of tasks step.
335 */
a35560ba 336 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 337 const numberOfTasksInProgress = this.tasks.get(worker)
338 if (numberOfTasksInProgress !== undefined) {
f416c098 339 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 340 } else {
341 throw Error('Worker could not be found in tasks map')
342 }
343 }
344
729c563d
S
345 /**
346 * Removes the given worker from the pool.
347 *
348 * @param worker Worker that will be removed.
349 */
f2fdaa86
JB
350 protected removeWorker (worker: Worker): void {
351 // Clean worker from data structure
352 const workerIndex = this.workers.indexOf(worker)
353 this.workers.splice(workerIndex, 1)
354 this.tasks.delete(worker)
355 }
356
280c2a77
S
357 /**
358 * Choose a worker for the next task.
359 *
360 * The default implementation uses a round robin algorithm to distribute the load.
361 *
362 * @returns Worker.
363 */
364 protected chooseWorker (): Worker {
a35560ba 365 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
366 }
367
280c2a77
S
368 /**
369 * Send a message to the given worker.
370 *
371 * @param worker The worker which should receive the message.
372 * @param message The message.
373 */
374 protected abstract sendToWorker (
375 worker: Worker,
376 message: MessageValue<Data>
377 ): void
378
4a6952ff
JB
379 /**
380 * Register a listener callback on a given worker.
381 *
382 * @param worker A worker.
383 * @param listener A message listener callback.
384 */
385 protected abstract registerWorkerMessageListener<
4f7fa42a
S
386 Message extends Data | Response
387 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 388
280c2a77
S
389 protected internalExecute (
390 worker: Worker,
391 messageId: number
392 ): Promise<Response> {
14916bf9 393 this.increaseWorkersTask(worker)
be0676b3
APA
394 return new Promise<Response>((resolve, reject) => {
395 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
396 })
397 }
398
729c563d
S
399 /**
400 * Returns a newly created worker.
401 */
280c2a77 402 protected abstract createWorker (): Worker
c97c7edb 403
729c563d
S
404 /**
405 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
406 *
407 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
408 *
409 * @param worker The newly created worker.
410 */
280c2a77 411 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 412
4a6952ff
JB
413 /**
414 * Creates a new worker for this pool and sets it up completely.
415 *
416 * @returns New, completely set up worker.
417 */
418 protected createAndSetupWorker (): Worker {
bdacc2d2 419 const worker = this.createWorker()
280c2a77 420
35cf1c03 421 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
422 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
423 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
424 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 425 worker.once('exit', () => this.removeWorker(worker))
280c2a77 426
c97c7edb 427 this.workers.push(worker)
280c2a77
S
428
429 // Init tasks map
c97c7edb 430 this.tasks.set(worker, 0)
280c2a77
S
431
432 this.afterWorkerSetup(worker)
433
c97c7edb
S
434 return worker
435 }
be0676b3
APA
436
437 /**
438 * This function is the listener registered for each worker.
439 *
bdacc2d2 440 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
441 */
442 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 443 return message => {
bdacc2d2
JB
444 if (message.id !== undefined) {
445 const promise = this.promiseMap.get(message.id)
446 if (promise !== undefined) {
447 this.decreaseWorkersTasks(promise.worker)
448 if (message.error) promise.reject(message.error)
449 else promise.resolve(message.data as Response)
be0676b3
APA
450 this.promiseMap.delete(message.id)
451 }
452 }
453 }
be0676b3 454 }
7c0ba920
JB
455
456 private checkAndEmitBusy (): void {
457 if (this.opts.enableEvents && this.busy) {
458 this.emitter?.emit('busy')
459 }
460 }
c97c7edb 461}