Merge branch 'master' of github.com:pioardi/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
a35560ba 7import type { IPoolInternal } from './pool-internal'
7c0ba920 8import { PoolEmitter, PoolType } from './pool-internal'
a35560ba
S
9import type { WorkerChoiceStrategy } from './selection-strategies'
10import {
11 WorkerChoiceStrategies,
12 WorkerChoiceStrategyContext
13} from './selection-strategies'
c97c7edb 14
729c563d
S
15/**
16 * Callback invoked if the worker raised an error.
17 */
c97c7edb 18export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
19
20/**
21 * Callback invoked when the worker has started successfully.
22 */
c97c7edb 23export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
24
25/**
26 * Callback invoked when the worker exits successfully.
27 */
c97c7edb
S
28export type ExitHandler<Worker> = (this: Worker, code: number) => void
29
729c563d
S
30/**
31 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
32 */
c97c7edb 33export interface IWorker {
3832ad95
S
34 /**
35 * Register a listener to the error event.
36 *
37 * @param event `'error'`.
38 * @param handler The error handler.
39 */
c97c7edb 40 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
41 /**
42 * Register a listener to the online event.
43 *
44 * @param event `'online'`.
45 * @param handler The online handler.
46 */
c97c7edb 47 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
48 /**
49 * Register a listener to the exit event.
50 *
51 * @param event `'exit'`.
52 * @param handler The exit handler.
53 */
c97c7edb 54 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
55 /**
56 * Register a listener to the exit event that will only performed once.
57 *
58 * @param event `'exit'`.
59 * @param handler The exit handler.
60 */
45dbbb14 61 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
62}
63
729c563d
S
64/**
65 * Options for a poolifier pool.
66 */
c97c7edb
S
67export interface PoolOptions<Worker> {
68 /**
69 * A function that will listen for error event on each worker.
70 */
71 errorHandler?: ErrorHandler<Worker>
72 /**
73 * A function that will listen for online event on each worker.
74 */
75 onlineHandler?: OnlineHandler<Worker>
76 /**
77 * A function that will listen for exit event on each worker.
78 */
79 exitHandler?: ExitHandler<Worker>
a35560ba
S
80 /**
81 * The work choice strategy to use in this pool.
82 */
83 workerChoiceStrategy?: WorkerChoiceStrategy
7c0ba920
JB
84 /**
85 * Pool events emission.
86 *
87 * Default to true.
88 */
89 enableEvents?: boolean
c97c7edb
S
90}
91
729c563d
S
92/**
93 * Base class containing some shared logic for all poolifier pools.
94 *
95 * @template Worker Type of worker which manages this pool.
deb85c12
JB
96 * @template Data Type of data sent to the worker. This can only be serializable data.
97 * @template Response Type of response of execution. This can only be serializable data.
729c563d 98 */
c97c7edb
S
99export abstract class AbstractPool<
100 Worker extends IWorker,
d3c8a1a8
S
101 Data = unknown,
102 Response = unknown
036c58dd
JB
103> implements IPoolInternal<Worker, Data, Response>
104{
4a6952ff
JB
105 /** @inheritdoc */
106 public readonly workers: Worker[] = []
107
108 /** @inheritdoc */
109 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
110
111 /** @inheritdoc */
7c0ba920
JB
112 public readonly emitter?: PoolEmitter
113
114 /** @inheritdoc */
115 public readonly max?: number
4a6952ff 116
be0676b3
APA
117 /**
118 * The promise map.
119 *
e088a00c 120 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
121 * - `value`: An object that contains the worker, the resolve function and the reject function.
122 *
123 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
124 */
125 protected promiseMap: Map<
126 number,
127 PromiseWorkerResponseWrapper<Worker, Response>
128 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
129
729c563d 130 /**
e088a00c 131 * Id of the next message.
729c563d 132 */
280c2a77 133 protected nextMessageId: number = 0
c97c7edb 134
a35560ba
S
135 /**
136 * Worker choice strategy instance implementing the worker choice algorithm.
137 *
138 * Default to a strategy implementing a round robin algorithm.
139 */
140 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
141 Worker,
142 Data,
143 Response
144 >
145
729c563d
S
146 /**
147 * Constructs a new poolifier pool.
148 *
5c5a1fb7 149 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 150 * @param filePath Path to the worker-file.
1927ee67 151 * @param opts Options for the pool.
729c563d 152 */
c97c7edb 153 public constructor (
5c5a1fb7 154 public readonly numberOfWorkers: number,
c97c7edb 155 public readonly filePath: string,
1927ee67 156 public readonly opts: PoolOptions<Worker>
c97c7edb
S
157 ) {
158 if (!this.isMain()) {
159 throw new Error('Cannot start a pool from a worker!')
160 }
8d3782fa 161 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 162 this.checkFilePath(this.filePath)
7c0ba920 163 this.checkPoolOptions(this.opts)
c97c7edb
S
164 this.setupHook()
165
5c5a1fb7 166 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 167 this.createAndSetupWorker()
c97c7edb
S
168 }
169
7c0ba920
JB
170 if (this.opts.enableEvents) {
171 this.emitter = new PoolEmitter()
172 }
a35560ba
S
173 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
174 this,
4a6952ff
JB
175 () => {
176 const workerCreated = this.createAndSetupWorker()
14916bf9 177 this.registerWorkerMessageListener(workerCreated, async message => {
4a6952ff
JB
178 const tasksInProgress = this.tasks.get(workerCreated)
179 if (
180 isKillBehavior(KillBehaviors.HARD, message.kill) ||
181 tasksInProgress === 0
182 ) {
183 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
14916bf9 184 await this.destroyWorker(workerCreated)
4a6952ff
JB
185 }
186 })
187 return workerCreated
188 },
e843b904 189 this.opts.workerChoiceStrategy
a35560ba 190 )
c97c7edb
S
191 }
192
a35560ba 193 private checkFilePath (filePath: string): void {
c510fea7
APA
194 if (!filePath) {
195 throw new Error('Please specify a file with a worker implementation')
196 }
197 }
198
8d3782fa
JB
199 private checkNumberOfWorkers (numberOfWorkers: number): void {
200 if (numberOfWorkers == null) {
201 throw new Error(
202 'Cannot instantiate a pool without specifying the number of workers'
203 )
204 } else if (!Number.isSafeInteger(numberOfWorkers)) {
205 throw new Error(
206 'Cannot instantiate a pool with a non integer number of workers'
207 )
208 } else if (numberOfWorkers < 0) {
209 throw new Error(
210 'Cannot instantiate a pool with a negative number of workers'
211 )
7c0ba920 212 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
213 throw new Error('Cannot instantiate a fixed pool with no worker')
214 }
215 }
216
7c0ba920 217 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
218 this.opts.workerChoiceStrategy =
219 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
220 this.opts.enableEvents = opts.enableEvents ?? true
221 }
222
a35560ba 223 /** @inheritdoc */
7c0ba920
JB
224 public abstract get type (): PoolType
225
226 /** @inheritdoc */
227 public get numberOfRunningTasks (): number {
228 return this.promiseMap.size
a35560ba
S
229 }
230
231 /** @inheritdoc */
232 public setWorkerChoiceStrategy (
233 workerChoiceStrategy: WorkerChoiceStrategy
234 ): void {
b98ec2e6 235 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
236 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
237 workerChoiceStrategy
238 )
239 }
240
7c0ba920
JB
241 /** @inheritdoc */
242 public abstract get busy (): boolean
243
244 protected internalGetBusyStatus (): boolean {
245 return (
246 this.numberOfRunningTasks >= this.numberOfWorkers &&
247 this.findFreeTasksMapEntry() === false
248 )
249 }
250
251 /** @inheritdoc */
252 public findFreeTasksMapEntry (): [Worker, number] | false {
253 for (const [worker, numberOfTasks] of this.tasks) {
254 if (numberOfTasks === 0) {
255 // A worker is free, return the matching tasks map entry
256 return [worker, numberOfTasks]
257 }
258 }
259 return false
260 }
261
a35560ba 262 /** @inheritdoc */
280c2a77
S
263 public execute (data: Data): Promise<Response> {
264 // Configure worker to handle message with the specified task
265 const worker = this.chooseWorker()
280c2a77
S
266 const messageId = ++this.nextMessageId
267 const res = this.internalExecute(worker, messageId)
14916bf9 268 this.checkAndEmitBusy()
280c2a77
S
269 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
270 return res
271 }
c97c7edb 272
a35560ba 273 /** @inheritdoc */
c97c7edb 274 public async destroy (): Promise<void> {
45dbbb14 275 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
276 }
277
4a6952ff
JB
278 /**
279 * Shut down given worker.
280 *
281 * @param worker A worker within `workers`.
282 */
283 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 284
729c563d 285 /**
280c2a77
S
286 * Setup hook that can be overridden by a Poolifier pool implementation
287 * to run code before workers are created in the abstract constructor.
729c563d 288 */
280c2a77
S
289 protected setupHook (): void {
290 // Can be overridden
291 }
c97c7edb 292
729c563d 293 /**
280c2a77
S
294 * Should return whether the worker is the main worker or not.
295 */
296 protected abstract isMain (): boolean
297
298 /**
7c0ba920 299 * Increase the number of tasks that the given workers has applied.
729c563d 300 *
f416c098 301 * @param worker Worker whose tasks are increased.
729c563d 302 */
280c2a77 303 protected increaseWorkersTask (worker: Worker): void {
f416c098 304 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
305 }
306
c01733f1 307 /**
7c0ba920 308 * Decrease the number of tasks that the given workers has applied.
c01733f1 309 *
f416c098 310 * @param worker Worker whose tasks are decreased.
c01733f1 311 */
312 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
313 this.stepWorkerNumberOfTasks(worker, -1)
314 }
315
316 /**
7c0ba920 317 * Step the number of tasks that the given workers has applied.
f416c098
JB
318 *
319 * @param worker Worker whose tasks are set.
320 * @param step Worker number of tasks step.
321 */
a35560ba 322 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 323 const numberOfTasksInProgress = this.tasks.get(worker)
324 if (numberOfTasksInProgress !== undefined) {
f416c098 325 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 326 } else {
327 throw Error('Worker could not be found in tasks map')
328 }
329 }
330
729c563d
S
331 /**
332 * Removes the given worker from the pool.
333 *
334 * @param worker Worker that will be removed.
335 */
f2fdaa86
JB
336 protected removeWorker (worker: Worker): void {
337 // Clean worker from data structure
338 const workerIndex = this.workers.indexOf(worker)
339 this.workers.splice(workerIndex, 1)
340 this.tasks.delete(worker)
341 }
342
280c2a77
S
343 /**
344 * Choose a worker for the next task.
345 *
346 * The default implementation uses a round robin algorithm to distribute the load.
347 *
348 * @returns Worker.
349 */
350 protected chooseWorker (): Worker {
a35560ba 351 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
352 }
353
280c2a77
S
354 /**
355 * Send a message to the given worker.
356 *
357 * @param worker The worker which should receive the message.
358 * @param message The message.
359 */
360 protected abstract sendToWorker (
361 worker: Worker,
362 message: MessageValue<Data>
363 ): void
364
4a6952ff
JB
365 /**
366 * Register a listener callback on a given worker.
367 *
368 * @param worker A worker.
369 * @param listener A message listener callback.
370 */
371 protected abstract registerWorkerMessageListener<
4f7fa42a
S
372 Message extends Data | Response
373 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 374
280c2a77
S
375 protected internalExecute (
376 worker: Worker,
377 messageId: number
378 ): Promise<Response> {
14916bf9 379 this.increaseWorkersTask(worker)
be0676b3
APA
380 return new Promise<Response>((resolve, reject) => {
381 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
382 })
383 }
384
729c563d
S
385 /**
386 * Returns a newly created worker.
387 */
280c2a77 388 protected abstract createWorker (): Worker
c97c7edb 389
729c563d
S
390 /**
391 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
392 *
393 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
394 *
395 * @param worker The newly created worker.
396 */
280c2a77 397 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 398
4a6952ff
JB
399 /**
400 * Creates a new worker for this pool and sets it up completely.
401 *
402 * @returns New, completely set up worker.
403 */
404 protected createAndSetupWorker (): Worker {
280c2a77
S
405 const worker: Worker = this.createWorker()
406
a35560ba
S
407 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
408 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
409 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 410 worker.once('exit', () => this.removeWorker(worker))
280c2a77 411
c97c7edb 412 this.workers.push(worker)
280c2a77
S
413
414 // Init tasks map
c97c7edb 415 this.tasks.set(worker, 0)
280c2a77
S
416
417 this.afterWorkerSetup(worker)
418
c97c7edb
S
419 return worker
420 }
be0676b3
APA
421
422 /**
423 * This function is the listener registered for each worker.
424 *
425 * @returns The listener function to execute when a message is sent from a worker.
426 */
427 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 428 return message => {
be0676b3
APA
429 if (message.id) {
430 const value = this.promiseMap.get(message.id)
431 if (value) {
432 this.decreaseWorkersTasks(value.worker)
433 if (message.error) value.reject(message.error)
434 else value.resolve(message.data as Response)
435 this.promiseMap.delete(message.id)
436 }
437 }
438 }
be0676b3 439 }
7c0ba920
JB
440
441 private checkAndEmitBusy (): void {
442 if (this.opts.enableEvents && this.busy) {
443 this.emitter?.emit('busy')
444 }
445 }
c97c7edb 446}