Fix strategy handling in pool options (#259)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
4a6952ff 5import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
a35560ba 6import type { IPoolInternal } from './pool-internal'
7c0ba920 7import { PoolEmitter, PoolType } from './pool-internal'
a35560ba
S
8import type { WorkerChoiceStrategy } from './selection-strategies'
9import {
10 WorkerChoiceStrategies,
11 WorkerChoiceStrategyContext
12} from './selection-strategies'
c97c7edb 13
c510fea7
APA
14/**
15 * An intentional empty function.
16 */
a35560ba
S
17const EMPTY_FUNCTION: () => void = () => {
18 /* Intentionally empty */
c510fea7
APA
19}
20
729c563d
S
21/**
22 * Callback invoked if the worker raised an error.
23 */
c97c7edb 24export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
25
26/**
27 * Callback invoked when the worker has started successfully.
28 */
c97c7edb 29export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
30
31/**
32 * Callback invoked when the worker exits successfully.
33 */
c97c7edb
S
34export type ExitHandler<Worker> = (this: Worker, code: number) => void
35
729c563d
S
36/**
37 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
38 */
c97c7edb 39export interface IWorker {
3832ad95
S
40 /**
41 * Register a listener to the error event.
42 *
43 * @param event `'error'`.
44 * @param handler The error handler.
45 */
c97c7edb 46 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
47 /**
48 * Register a listener to the online event.
49 *
50 * @param event `'online'`.
51 * @param handler The online handler.
52 */
c97c7edb 53 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
54 /**
55 * Register a listener to the exit event.
56 *
57 * @param event `'exit'`.
58 * @param handler The exit handler.
59 */
c97c7edb 60 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
61 /**
62 * Register a listener to the exit event that will only performed once.
63 *
64 * @param event `'exit'`.
65 * @param handler The exit handler.
66 */
45dbbb14 67 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
68}
69
729c563d
S
70/**
71 * Options for a poolifier pool.
72 */
c97c7edb
S
73export interface PoolOptions<Worker> {
74 /**
75 * A function that will listen for error event on each worker.
76 */
77 errorHandler?: ErrorHandler<Worker>
78 /**
79 * A function that will listen for online event on each worker.
80 */
81 onlineHandler?: OnlineHandler<Worker>
82 /**
83 * A function that will listen for exit event on each worker.
84 */
85 exitHandler?: ExitHandler<Worker>
a35560ba
S
86 /**
87 * The work choice strategy to use in this pool.
88 */
89 workerChoiceStrategy?: WorkerChoiceStrategy
7c0ba920
JB
90 /**
91 * Pool events emission.
92 *
93 * Default to true.
94 */
95 enableEvents?: boolean
c97c7edb
S
96}
97
729c563d
S
98/**
99 * Base class containing some shared logic for all poolifier pools.
100 *
101 * @template Worker Type of worker which manages this pool.
deb85c12
JB
102 * @template Data Type of data sent to the worker. This can only be serializable data.
103 * @template Response Type of response of execution. This can only be serializable data.
729c563d 104 */
c97c7edb
S
105export abstract class AbstractPool<
106 Worker extends IWorker,
d3c8a1a8
S
107 Data = unknown,
108 Response = unknown
a35560ba 109> implements IPoolInternal<Worker, Data, Response> {
4a6952ff
JB
110 /** @inheritdoc */
111 public readonly workers: Worker[] = []
112
113 /** @inheritdoc */
114 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
115
116 /** @inheritdoc */
7c0ba920
JB
117 public readonly emitter?: PoolEmitter
118
119 /** @inheritdoc */
120 public readonly max?: number
4a6952ff 121
be0676b3
APA
122 /**
123 * The promise map.
124 *
125 * - `key`: This is the message ID of each submitted task.
126 * - `value`: An object that contains the worker, the resolve function and the reject function.
127 *
128 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
129 */
130 protected promiseMap: Map<
131 number,
132 PromiseWorkerResponseWrapper<Worker, Response>
133 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
134
729c563d
S
135 /**
136 * ID of the next message.
137 */
280c2a77 138 protected nextMessageId: number = 0
c97c7edb 139
a35560ba
S
140 /**
141 * Worker choice strategy instance implementing the worker choice algorithm.
142 *
143 * Default to a strategy implementing a round robin algorithm.
144 */
145 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
146 Worker,
147 Data,
148 Response
149 >
150
729c563d
S
151 /**
152 * Constructs a new poolifier pool.
153 *
5c5a1fb7 154 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 155 * @param filePath Path to the worker-file.
1927ee67 156 * @param opts Options for the pool.
729c563d 157 */
c97c7edb 158 public constructor (
5c5a1fb7 159 public readonly numberOfWorkers: number,
c97c7edb 160 public readonly filePath: string,
1927ee67 161 public readonly opts: PoolOptions<Worker>
c97c7edb
S
162 ) {
163 if (!this.isMain()) {
164 throw new Error('Cannot start a pool from a worker!')
165 }
8d3782fa 166 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 167 this.checkFilePath(this.filePath)
7c0ba920 168 this.checkPoolOptions(this.opts)
c97c7edb
S
169 this.setupHook()
170
5c5a1fb7 171 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 172 this.createAndSetupWorker()
c97c7edb
S
173 }
174
7c0ba920
JB
175 if (this.opts.enableEvents) {
176 this.emitter = new PoolEmitter()
177 }
a35560ba
S
178 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
179 this,
4a6952ff
JB
180 () => {
181 const workerCreated = this.createAndSetupWorker()
182 this.registerWorkerMessageListener(workerCreated, message => {
183 const tasksInProgress = this.tasks.get(workerCreated)
184 if (
185 isKillBehavior(KillBehaviors.HARD, message.kill) ||
186 tasksInProgress === 0
187 ) {
188 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
189 void this.destroyWorker(workerCreated)
190 }
191 })
192 return workerCreated
193 },
e843b904 194 this.opts.workerChoiceStrategy
a35560ba 195 )
c97c7edb
S
196 }
197
a35560ba 198 private checkFilePath (filePath: string): void {
c510fea7
APA
199 if (!filePath) {
200 throw new Error('Please specify a file with a worker implementation')
201 }
202 }
203
8d3782fa
JB
204 private checkNumberOfWorkers (numberOfWorkers: number): void {
205 if (numberOfWorkers == null) {
206 throw new Error(
207 'Cannot instantiate a pool without specifying the number of workers'
208 )
209 } else if (!Number.isSafeInteger(numberOfWorkers)) {
210 throw new Error(
211 'Cannot instantiate a pool with a non integer number of workers'
212 )
213 } else if (numberOfWorkers < 0) {
214 throw new Error(
215 'Cannot instantiate a pool with a negative number of workers'
216 )
7c0ba920 217 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
218 throw new Error('Cannot instantiate a fixed pool with no worker')
219 }
220 }
221
7c0ba920 222 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
223 this.opts.workerChoiceStrategy =
224 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 }
227
a35560ba 228 /** @inheritdoc */
7c0ba920
JB
229 public abstract get type (): PoolType
230
231 /** @inheritdoc */
232 public get numberOfRunningTasks (): number {
233 return this.promiseMap.size
a35560ba
S
234 }
235
236 /** @inheritdoc */
237 public setWorkerChoiceStrategy (
238 workerChoiceStrategy: WorkerChoiceStrategy
239 ): void {
b98ec2e6 240 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
241 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
242 workerChoiceStrategy
243 )
244 }
245
7c0ba920
JB
246 /** @inheritdoc */
247 public abstract get busy (): boolean
248
249 protected internalGetBusyStatus (): boolean {
250 return (
251 this.numberOfRunningTasks >= this.numberOfWorkers &&
252 this.findFreeTasksMapEntry() === false
253 )
254 }
255
256 /** @inheritdoc */
257 public findFreeTasksMapEntry (): [Worker, number] | false {
258 for (const [worker, numberOfTasks] of this.tasks) {
259 if (numberOfTasks === 0) {
260 // A worker is free, return the matching tasks map entry
261 return [worker, numberOfTasks]
262 }
263 }
264 return false
265 }
266
a35560ba 267 /** @inheritdoc */
280c2a77
S
268 public execute (data: Data): Promise<Response> {
269 // Configure worker to handle message with the specified task
270 const worker = this.chooseWorker()
271 this.increaseWorkersTask(worker)
7c0ba920 272 this.checkAndEmitBusy()
280c2a77
S
273 const messageId = ++this.nextMessageId
274 const res = this.internalExecute(worker, messageId)
275 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
276 return res
277 }
c97c7edb 278
a35560ba 279 /** @inheritdoc */
c97c7edb 280 public async destroy (): Promise<void> {
45dbbb14 281 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
282 }
283
4a6952ff
JB
284 /**
285 * Shut down given worker.
286 *
287 * @param worker A worker within `workers`.
288 */
289 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 290
729c563d 291 /**
280c2a77
S
292 * Setup hook that can be overridden by a Poolifier pool implementation
293 * to run code before workers are created in the abstract constructor.
729c563d 294 */
280c2a77
S
295 protected setupHook (): void {
296 // Can be overridden
297 }
c97c7edb 298
729c563d 299 /**
280c2a77
S
300 * Should return whether the worker is the main worker or not.
301 */
302 protected abstract isMain (): boolean
303
304 /**
7c0ba920 305 * Increase the number of tasks that the given workers has applied.
729c563d 306 *
f416c098 307 * @param worker Worker whose tasks are increased.
729c563d 308 */
280c2a77 309 protected increaseWorkersTask (worker: Worker): void {
f416c098 310 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
311 }
312
c01733f1 313 /**
7c0ba920 314 * Decrease the number of tasks that the given workers has applied.
c01733f1 315 *
f416c098 316 * @param worker Worker whose tasks are decreased.
c01733f1 317 */
318 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
319 this.stepWorkerNumberOfTasks(worker, -1)
320 }
321
322 /**
7c0ba920 323 * Step the number of tasks that the given workers has applied.
f416c098
JB
324 *
325 * @param worker Worker whose tasks are set.
326 * @param step Worker number of tasks step.
327 */
a35560ba 328 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 329 const numberOfTasksInProgress = this.tasks.get(worker)
330 if (numberOfTasksInProgress !== undefined) {
f416c098 331 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 332 } else {
333 throw Error('Worker could not be found in tasks map')
334 }
335 }
336
729c563d
S
337 /**
338 * Removes the given worker from the pool.
339 *
340 * @param worker Worker that will be removed.
341 */
f2fdaa86
JB
342 protected removeWorker (worker: Worker): void {
343 // Clean worker from data structure
344 const workerIndex = this.workers.indexOf(worker)
345 this.workers.splice(workerIndex, 1)
346 this.tasks.delete(worker)
347 }
348
280c2a77
S
349 /**
350 * Choose a worker for the next task.
351 *
352 * The default implementation uses a round robin algorithm to distribute the load.
353 *
354 * @returns Worker.
355 */
356 protected chooseWorker (): Worker {
a35560ba 357 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
358 }
359
280c2a77
S
360 /**
361 * Send a message to the given worker.
362 *
363 * @param worker The worker which should receive the message.
364 * @param message The message.
365 */
366 protected abstract sendToWorker (
367 worker: Worker,
368 message: MessageValue<Data>
369 ): void
370
4a6952ff
JB
371 /**
372 * Register a listener callback on a given worker.
373 *
374 * @param worker A worker.
375 * @param listener A message listener callback.
376 */
377 protected abstract registerWorkerMessageListener<
4f7fa42a
S
378 Message extends Data | Response
379 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 380
280c2a77
S
381 protected internalExecute (
382 worker: Worker,
383 messageId: number
384 ): Promise<Response> {
be0676b3
APA
385 return new Promise<Response>((resolve, reject) => {
386 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
387 })
388 }
389
729c563d
S
390 /**
391 * Returns a newly created worker.
392 */
280c2a77 393 protected abstract createWorker (): Worker
c97c7edb 394
729c563d
S
395 /**
396 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
397 *
398 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
399 *
400 * @param worker The newly created worker.
401 */
280c2a77 402 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 403
4a6952ff
JB
404 /**
405 * Creates a new worker for this pool and sets it up completely.
406 *
407 * @returns New, completely set up worker.
408 */
409 protected createAndSetupWorker (): Worker {
280c2a77
S
410 const worker: Worker = this.createWorker()
411
a35560ba
S
412 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
413 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
414 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 415 worker.once('exit', () => this.removeWorker(worker))
280c2a77 416
c97c7edb 417 this.workers.push(worker)
280c2a77
S
418
419 // Init tasks map
c97c7edb 420 this.tasks.set(worker, 0)
280c2a77
S
421
422 this.afterWorkerSetup(worker)
423
c97c7edb
S
424 return worker
425 }
be0676b3
APA
426
427 /**
428 * This function is the listener registered for each worker.
429 *
430 * @returns The listener function to execute when a message is sent from a worker.
431 */
432 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 433 return message => {
be0676b3
APA
434 if (message.id) {
435 const value = this.promiseMap.get(message.id)
436 if (value) {
437 this.decreaseWorkersTasks(value.worker)
438 if (message.error) value.reject(message.error)
439 else value.resolve(message.data as Response)
440 this.promiseMap.delete(message.id)
441 }
442 }
443 }
be0676b3 444 }
7c0ba920
JB
445
446 private checkAndEmitBusy (): void {
447 if (this.opts.enableEvents && this.busy) {
448 this.emitter?.emit('busy')
449 }
450 }
c97c7edb 451}