Pool busy event emitting on all pool types (#241)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
4a6952ff 5import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
a35560ba 6import type { IPoolInternal } from './pool-internal'
7c0ba920 7import { PoolEmitter, PoolType } from './pool-internal'
a35560ba
S
8import type { WorkerChoiceStrategy } from './selection-strategies'
9import {
10 WorkerChoiceStrategies,
11 WorkerChoiceStrategyContext
12} from './selection-strategies'
c97c7edb 13
c510fea7
APA
14/**
15 * An intentional empty function.
16 */
a35560ba
S
17const EMPTY_FUNCTION: () => void = () => {
18 /* Intentionally empty */
c510fea7
APA
19}
20
729c563d
S
21/**
22 * Callback invoked if the worker raised an error.
23 */
c97c7edb 24export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
25
26/**
27 * Callback invoked when the worker has started successfully.
28 */
c97c7edb 29export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
30
31/**
32 * Callback invoked when the worker exits successfully.
33 */
c97c7edb
S
34export type ExitHandler<Worker> = (this: Worker, code: number) => void
35
729c563d
S
36/**
37 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
38 */
c97c7edb 39export interface IWorker {
3832ad95
S
40 /**
41 * Register a listener to the error event.
42 *
43 * @param event `'error'`.
44 * @param handler The error handler.
45 */
c97c7edb 46 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
47 /**
48 * Register a listener to the online event.
49 *
50 * @param event `'online'`.
51 * @param handler The online handler.
52 */
c97c7edb 53 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
54 /**
55 * Register a listener to the exit event.
56 *
57 * @param event `'exit'`.
58 * @param handler The exit handler.
59 */
c97c7edb 60 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
61 /**
62 * Register a listener to the exit event that will only performed once.
63 *
64 * @param event `'exit'`.
65 * @param handler The exit handler.
66 */
45dbbb14 67 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
68}
69
729c563d
S
70/**
71 * Options for a poolifier pool.
72 */
c97c7edb
S
73export interface PoolOptions<Worker> {
74 /**
75 * A function that will listen for error event on each worker.
76 */
77 errorHandler?: ErrorHandler<Worker>
78 /**
79 * A function that will listen for online event on each worker.
80 */
81 onlineHandler?: OnlineHandler<Worker>
82 /**
83 * A function that will listen for exit event on each worker.
84 */
85 exitHandler?: ExitHandler<Worker>
a35560ba
S
86 /**
87 * The work choice strategy to use in this pool.
88 */
89 workerChoiceStrategy?: WorkerChoiceStrategy
7c0ba920
JB
90 /**
91 * Pool events emission.
92 *
93 * Default to true.
94 */
95 enableEvents?: boolean
c97c7edb
S
96}
97
729c563d
S
98/**
99 * Base class containing some shared logic for all poolifier pools.
100 *
101 * @template Worker Type of worker which manages this pool.
deb85c12
JB
102 * @template Data Type of data sent to the worker. This can only be serializable data.
103 * @template Response Type of response of execution. This can only be serializable data.
729c563d 104 */
c97c7edb
S
105export abstract class AbstractPool<
106 Worker extends IWorker,
d3c8a1a8
S
107 Data = unknown,
108 Response = unknown
a35560ba 109> implements IPoolInternal<Worker, Data, Response> {
4a6952ff
JB
110 /** @inheritdoc */
111 public readonly workers: Worker[] = []
112
113 /** @inheritdoc */
114 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
115
116 /** @inheritdoc */
7c0ba920
JB
117 public readonly emitter?: PoolEmitter
118
119 /** @inheritdoc */
120 public readonly max?: number
4a6952ff 121
be0676b3
APA
122 /**
123 * The promise map.
124 *
125 * - `key`: This is the message ID of each submitted task.
126 * - `value`: An object that contains the worker, the resolve function and the reject function.
127 *
128 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
129 */
130 protected promiseMap: Map<
131 number,
132 PromiseWorkerResponseWrapper<Worker, Response>
133 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
134
729c563d
S
135 /**
136 * ID of the next message.
137 */
280c2a77 138 protected nextMessageId: number = 0
c97c7edb 139
a35560ba
S
140 /**
141 * Worker choice strategy instance implementing the worker choice algorithm.
142 *
143 * Default to a strategy implementing a round robin algorithm.
144 */
145 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
146 Worker,
147 Data,
148 Response
149 >
150
729c563d
S
151 /**
152 * Constructs a new poolifier pool.
153 *
5c5a1fb7 154 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 155 * @param filePath Path to the worker-file.
1927ee67 156 * @param opts Options for the pool.
729c563d 157 */
c97c7edb 158 public constructor (
5c5a1fb7 159 public readonly numberOfWorkers: number,
c97c7edb 160 public readonly filePath: string,
1927ee67 161 public readonly opts: PoolOptions<Worker>
c97c7edb
S
162 ) {
163 if (!this.isMain()) {
164 throw new Error('Cannot start a pool from a worker!')
165 }
8d3782fa 166 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 167 this.checkFilePath(this.filePath)
7c0ba920 168 this.checkPoolOptions(this.opts)
c97c7edb
S
169 this.setupHook()
170
5c5a1fb7 171 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 172 this.createAndSetupWorker()
c97c7edb
S
173 }
174
7c0ba920
JB
175 if (this.opts.enableEvents) {
176 this.emitter = new PoolEmitter()
177 }
a35560ba
S
178 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
179 this,
4a6952ff
JB
180 () => {
181 const workerCreated = this.createAndSetupWorker()
182 this.registerWorkerMessageListener(workerCreated, message => {
183 const tasksInProgress = this.tasks.get(workerCreated)
184 if (
185 isKillBehavior(KillBehaviors.HARD, message.kill) ||
186 tasksInProgress === 0
187 ) {
188 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
189 void this.destroyWorker(workerCreated)
190 }
191 })
192 return workerCreated
193 },
a35560ba
S
194 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
195 )
c97c7edb
S
196 }
197
a35560ba 198 private checkFilePath (filePath: string): void {
c510fea7
APA
199 if (!filePath) {
200 throw new Error('Please specify a file with a worker implementation')
201 }
202 }
203
8d3782fa
JB
204 private checkNumberOfWorkers (numberOfWorkers: number): void {
205 if (numberOfWorkers == null) {
206 throw new Error(
207 'Cannot instantiate a pool without specifying the number of workers'
208 )
209 } else if (!Number.isSafeInteger(numberOfWorkers)) {
210 throw new Error(
211 'Cannot instantiate a pool with a non integer number of workers'
212 )
213 } else if (numberOfWorkers < 0) {
214 throw new Error(
215 'Cannot instantiate a pool with a negative number of workers'
216 )
7c0ba920 217 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
218 throw new Error('Cannot instantiate a fixed pool with no worker')
219 }
220 }
221
7c0ba920
JB
222 private checkPoolOptions (opts: PoolOptions<Worker>): void {
223 this.opts.enableEvents = opts.enableEvents ?? true
224 }
225
a35560ba 226 /** @inheritdoc */
7c0ba920
JB
227 public abstract get type (): PoolType
228
229 /** @inheritdoc */
230 public get numberOfRunningTasks (): number {
231 return this.promiseMap.size
a35560ba
S
232 }
233
234 /** @inheritdoc */
235 public setWorkerChoiceStrategy (
236 workerChoiceStrategy: WorkerChoiceStrategy
237 ): void {
b98ec2e6 238 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
239 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
240 workerChoiceStrategy
241 )
242 }
243
7c0ba920
JB
244 /** @inheritdoc */
245 public abstract get busy (): boolean
246
247 protected internalGetBusyStatus (): boolean {
248 return (
249 this.numberOfRunningTasks >= this.numberOfWorkers &&
250 this.findFreeTasksMapEntry() === false
251 )
252 }
253
254 /** @inheritdoc */
255 public findFreeTasksMapEntry (): [Worker, number] | false {
256 for (const [worker, numberOfTasks] of this.tasks) {
257 if (numberOfTasks === 0) {
258 // A worker is free, return the matching tasks map entry
259 return [worker, numberOfTasks]
260 }
261 }
262 return false
263 }
264
a35560ba 265 /** @inheritdoc */
280c2a77
S
266 public execute (data: Data): Promise<Response> {
267 // Configure worker to handle message with the specified task
268 const worker = this.chooseWorker()
269 this.increaseWorkersTask(worker)
7c0ba920 270 this.checkAndEmitBusy()
280c2a77
S
271 const messageId = ++this.nextMessageId
272 const res = this.internalExecute(worker, messageId)
273 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
274 return res
275 }
c97c7edb 276
a35560ba 277 /** @inheritdoc */
c97c7edb 278 public async destroy (): Promise<void> {
45dbbb14 279 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
280 }
281
4a6952ff
JB
282 /**
283 * Shut down given worker.
284 *
285 * @param worker A worker within `workers`.
286 */
287 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 288
729c563d 289 /**
280c2a77
S
290 * Setup hook that can be overridden by a Poolifier pool implementation
291 * to run code before workers are created in the abstract constructor.
729c563d 292 */
280c2a77
S
293 protected setupHook (): void {
294 // Can be overridden
295 }
c97c7edb 296
729c563d 297 /**
280c2a77
S
298 * Should return whether the worker is the main worker or not.
299 */
300 protected abstract isMain (): boolean
301
302 /**
7c0ba920 303 * Increase the number of tasks that the given workers has applied.
729c563d 304 *
f416c098 305 * @param worker Worker whose tasks are increased.
729c563d 306 */
280c2a77 307 protected increaseWorkersTask (worker: Worker): void {
f416c098 308 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
309 }
310
c01733f1 311 /**
7c0ba920 312 * Decrease the number of tasks that the given workers has applied.
c01733f1 313 *
f416c098 314 * @param worker Worker whose tasks are decreased.
c01733f1 315 */
316 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
317 this.stepWorkerNumberOfTasks(worker, -1)
318 }
319
320 /**
7c0ba920 321 * Step the number of tasks that the given workers has applied.
f416c098
JB
322 *
323 * @param worker Worker whose tasks are set.
324 * @param step Worker number of tasks step.
325 */
a35560ba 326 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 327 const numberOfTasksInProgress = this.tasks.get(worker)
328 if (numberOfTasksInProgress !== undefined) {
f416c098 329 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 330 } else {
331 throw Error('Worker could not be found in tasks map')
332 }
333 }
334
729c563d
S
335 /**
336 * Removes the given worker from the pool.
337 *
338 * @param worker Worker that will be removed.
339 */
f2fdaa86
JB
340 protected removeWorker (worker: Worker): void {
341 // Clean worker from data structure
342 const workerIndex = this.workers.indexOf(worker)
343 this.workers.splice(workerIndex, 1)
344 this.tasks.delete(worker)
345 }
346
280c2a77
S
347 /**
348 * Choose a worker for the next task.
349 *
350 * The default implementation uses a round robin algorithm to distribute the load.
351 *
352 * @returns Worker.
353 */
354 protected chooseWorker (): Worker {
a35560ba 355 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
356 }
357
280c2a77
S
358 /**
359 * Send a message to the given worker.
360 *
361 * @param worker The worker which should receive the message.
362 * @param message The message.
363 */
364 protected abstract sendToWorker (
365 worker: Worker,
366 message: MessageValue<Data>
367 ): void
368
4a6952ff
JB
369 /**
370 * Register a listener callback on a given worker.
371 *
372 * @param worker A worker.
373 * @param listener A message listener callback.
374 */
375 protected abstract registerWorkerMessageListener<
4f7fa42a
S
376 Message extends Data | Response
377 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 378
280c2a77
S
379 protected internalExecute (
380 worker: Worker,
381 messageId: number
382 ): Promise<Response> {
be0676b3
APA
383 return new Promise<Response>((resolve, reject) => {
384 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
385 })
386 }
387
729c563d
S
388 /**
389 * Returns a newly created worker.
390 */
280c2a77 391 protected abstract createWorker (): Worker
c97c7edb 392
729c563d
S
393 /**
394 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
395 *
396 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
397 *
398 * @param worker The newly created worker.
399 */
280c2a77 400 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 401
4a6952ff
JB
402 /**
403 * Creates a new worker for this pool and sets it up completely.
404 *
405 * @returns New, completely set up worker.
406 */
407 protected createAndSetupWorker (): Worker {
280c2a77
S
408 const worker: Worker = this.createWorker()
409
a35560ba
S
410 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
411 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
412 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 413 worker.once('exit', () => this.removeWorker(worker))
280c2a77 414
c97c7edb 415 this.workers.push(worker)
280c2a77
S
416
417 // Init tasks map
c97c7edb 418 this.tasks.set(worker, 0)
280c2a77
S
419
420 this.afterWorkerSetup(worker)
421
c97c7edb
S
422 return worker
423 }
be0676b3
APA
424
425 /**
426 * This function is the listener registered for each worker.
427 *
428 * @returns The listener function to execute when a message is sent from a worker.
429 */
430 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 431 return message => {
be0676b3
APA
432 if (message.id) {
433 const value = this.promiseMap.get(message.id)
434 if (value) {
435 this.decreaseWorkersTasks(value.worker)
436 if (message.error) value.reject(message.error)
437 else value.resolve(message.data as Response)
438 this.promiseMap.delete(message.id)
439 }
440 }
441 }
be0676b3 442 }
7c0ba920
JB
443
444 private checkAndEmitBusy (): void {
445 if (this.opts.enableEvents && this.busy) {
446 this.emitter?.emit('busy')
447 }
448 }
c97c7edb 449}