Fix documentation generation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
bdaf31cd 7import type { PoolOptions } from './pool'
b4904890 8import { PoolEmitter } from './pool'
bf9549ae 9import type { IPoolInternal, TasksUsage } from './pool-internal'
b4904890 10import { PoolType } from './pool-internal'
ea7a90d3 11import type { IPoolWorker } from './pool-worker'
a35560ba
S
12import {
13 WorkerChoiceStrategies,
bdaf31cd
JB
14 WorkerChoiceStrategy
15} from './selection-strategies/selection-strategies-types'
16import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 17
729c563d 18/**
ea7a90d3 19 * Base class that implements some shared logic for all poolifier pools.
729c563d
S
20 *
21 * @template Worker Type of worker which manages this pool.
deb85c12
JB
22 * @template Data Type of data sent to the worker. This can only be serializable data.
23 * @template Response Type of response of execution. This can only be serializable data.
729c563d 24 */
c97c7edb 25export abstract class AbstractPool<
ea7a90d3 26 Worker extends IPoolWorker,
d3c8a1a8
S
27 Data = unknown,
28 Response = unknown
9b2fdd9f 29> implements IPoolInternal<Worker, Data, Response> {
a76fac14 30 /** @inheritDoc */
4a6952ff
JB
31 public readonly workers: Worker[] = []
32
ea7a90d3
JB
33 /** @inheritDoc */
34 public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
bf9549ae
JB
35 Worker,
36 TasksUsage
37 >()
4a6952ff 38
a76fac14 39 /** @inheritDoc */
7c0ba920
JB
40 public readonly emitter?: PoolEmitter
41
a76fac14 42 /** @inheritDoc */
7c0ba920 43 public readonly max?: number
4a6952ff 44
be0676b3
APA
45 /**
46 * The promise map.
47 *
e088a00c 48 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
49 * - `value`: An object that contains the worker, the resolve function and the reject function.
50 *
51 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
52 */
53 protected promiseMap: Map<
54 number,
55 PromiseWorkerResponseWrapper<Worker, Response>
56 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
57
729c563d 58 /**
e088a00c 59 * Id of the next message.
729c563d 60 */
280c2a77 61 protected nextMessageId: number = 0
c97c7edb 62
a35560ba
S
63 /**
64 * Worker choice strategy instance implementing the worker choice algorithm.
65 *
66 * Default to a strategy implementing a round robin algorithm.
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
69 Worker,
70 Data,
71 Response
72 >
73
729c563d
S
74 /**
75 * Constructs a new poolifier pool.
76 *
5c5a1fb7 77 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 78 * @param filePath Path to the worker-file.
1927ee67 79 * @param opts Options for the pool.
729c563d 80 */
c97c7edb 81 public constructor (
5c5a1fb7 82 public readonly numberOfWorkers: number,
c97c7edb 83 public readonly filePath: string,
1927ee67 84 public readonly opts: PoolOptions<Worker>
c97c7edb 85 ) {
6bd72cd0 86 if (this.isMain() === false) {
c97c7edb
S
87 throw new Error('Cannot start a pool from a worker!')
88 }
8d3782fa 89 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 90 this.checkFilePath(this.filePath)
7c0ba920 91 this.checkPoolOptions(this.opts)
c97c7edb
S
92 this.setupHook()
93
5c5a1fb7 94 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 95 this.createAndSetupWorker()
c97c7edb
S
96 }
97
6bd72cd0 98 if (this.opts.enableEvents === true) {
7c0ba920
JB
99 this.emitter = new PoolEmitter()
100 }
a35560ba
S
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
102 this,
4a6952ff
JB
103 () => {
104 const workerCreated = this.createAndSetupWorker()
f3636726 105 this.registerWorkerMessageListener(workerCreated, message => {
4a6952ff
JB
106 if (
107 isKillBehavior(KillBehaviors.HARD, message.kill) ||
bdaf31cd 108 this.getWorkerRunningTasks(workerCreated) === 0
4a6952ff
JB
109 ) {
110 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
f3636726 111 this.destroyWorker(workerCreated) as void
4a6952ff
JB
112 }
113 })
114 return workerCreated
115 },
e843b904 116 this.opts.workerChoiceStrategy
a35560ba 117 )
c97c7edb
S
118 }
119
a35560ba 120 private checkFilePath (filePath: string): void {
c510fea7
APA
121 if (!filePath) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
b893d997 131 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
8d3782fa
JB
132 throw new Error(
133 'Cannot instantiate a pool with a non integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new Error(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
7c0ba920 139 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
145 this.opts.workerChoiceStrategy =
146 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
147 this.opts.enableEvents = opts.enableEvents ?? true
148 }
149
a76fac14 150 /** @inheritDoc */
7c0ba920
JB
151 public abstract get type (): PoolType
152
a76fac14 153 /** @inheritDoc */
7c0ba920
JB
154 public get numberOfRunningTasks (): number {
155 return this.promiseMap.size
a35560ba
S
156 }
157
bf9549ae
JB
158 /** @inheritDoc */
159 public getWorkerIndex (worker: Worker): number {
160 return this.workers.indexOf(worker)
161 }
162
a76fac14 163 /** @inheritDoc */
bdaf31cd 164 public getWorkerRunningTasks (worker: Worker): number | undefined {
bf9549ae 165 return this.workersTasksUsage.get(worker)?.running
bdaf31cd
JB
166 }
167
a76fac14 168 /** @inheritDoc */
bf9549ae
JB
169 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
170 return this.workersTasksUsage.get(worker)?.avgRunTime
bdaf31cd
JB
171 }
172
a76fac14 173 /** @inheritDoc */
a35560ba
S
174 public setWorkerChoiceStrategy (
175 workerChoiceStrategy: WorkerChoiceStrategy
176 ): void {
b98ec2e6 177 this.opts.workerChoiceStrategy = workerChoiceStrategy
ea7a90d3
JB
178 for (const worker of this.workers) {
179 this.resetWorkerTasksUsage(worker)
180 }
a35560ba
S
181 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
182 workerChoiceStrategy
183 )
184 }
185
a76fac14 186 /** @inheritDoc */
7c0ba920
JB
187 public abstract get busy (): boolean
188
189 protected internalGetBusyStatus (): boolean {
190 return (
191 this.numberOfRunningTasks >= this.numberOfWorkers &&
bdaf31cd 192 this.findFreeWorker() === false
7c0ba920
JB
193 )
194 }
195
a76fac14 196 /** @inheritDoc */
bdaf31cd
JB
197 public findFreeWorker (): Worker | false {
198 for (const worker of this.workers) {
199 if (this.getWorkerRunningTasks(worker) === 0) {
200 // A worker is free, return the matching worker
201 return worker
7c0ba920
JB
202 }
203 }
204 return false
205 }
206
a76fac14 207 /** @inheritDoc */
280c2a77
S
208 public execute (data: Data): Promise<Response> {
209 // Configure worker to handle message with the specified task
210 const worker = this.chooseWorker()
a05c10de 211 const res = this.internalExecute(worker, this.nextMessageId)
14916bf9 212 this.checkAndEmitBusy()
a05c10de
JB
213 this.sendToWorker(worker, {
214 data: data ?? ({} as Data),
215 id: this.nextMessageId
216 })
217 ++this.nextMessageId
280c2a77
S
218 return res
219 }
c97c7edb 220
a76fac14 221 /** @inheritDoc */
c97c7edb 222 public async destroy (): Promise<void> {
45dbbb14 223 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
224 }
225
4a6952ff 226 /**
675bb809 227 * Shutdowns given worker.
4a6952ff
JB
228 *
229 * @param worker A worker within `workers`.
230 */
231 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 232
729c563d 233 /**
280c2a77
S
234 * Setup hook that can be overridden by a Poolifier pool implementation
235 * to run code before workers are created in the abstract constructor.
729c563d 236 */
280c2a77
S
237 protected setupHook (): void {
238 // Can be overridden
239 }
c97c7edb 240
729c563d 241 /**
280c2a77
S
242 * Should return whether the worker is the main worker or not.
243 */
244 protected abstract isMain (): boolean
245
246 /**
bf9549ae
JB
247 * Hook executed before the worker task promise resolution.
248 * Can be overridden.
729c563d 249 *
bf9549ae 250 * @param worker The worker.
729c563d 251 */
bf9549ae
JB
252 protected beforePromiseWorkerResponseHook (worker: Worker): void {
253 this.increaseWorkerRunningTasks(worker)
c97c7edb
S
254 }
255
c01733f1 256 /**
bf9549ae
JB
257 * Hook executed after the worker task promise resolution.
258 * Can be overridden.
c01733f1 259 *
bf9549ae
JB
260 * @param message The received message.
261 * @param promise The Promise response.
c01733f1 262 */
bf9549ae
JB
263 protected afterPromiseWorkerResponseHook (
264 message: MessageValue<Response>,
265 promise: PromiseWorkerResponseWrapper<Worker, Response>
266 ): void {
267 this.decreaseWorkerRunningTasks(promise.worker)
268 this.stepWorkerRunTasks(promise.worker, 1)
269 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
c01733f1 270 }
271
729c563d
S
272 /**
273 * Removes the given worker from the pool.
274 *
11df3590 275 * @param worker The worker that will be removed.
729c563d 276 */
f2fdaa86
JB
277 protected removeWorker (worker: Worker): void {
278 // Clean worker from data structure
bdaf31cd 279 this.workers.splice(this.getWorkerIndex(worker), 1)
10fcfaf4 280 this.removeWorkerTasksUsage(worker)
f2fdaa86
JB
281 }
282
280c2a77 283 /**
675bb809 284 * Chooses a worker for the next task.
280c2a77
S
285 *
286 * The default implementation uses a round robin algorithm to distribute the load.
287 *
288 * @returns Worker.
289 */
290 protected chooseWorker (): Worker {
a35560ba 291 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
292 }
293
280c2a77 294 /**
675bb809 295 * Sends a message to the given worker.
280c2a77
S
296 *
297 * @param worker The worker which should receive the message.
298 * @param message The message.
299 */
300 protected abstract sendToWorker (
301 worker: Worker,
302 message: MessageValue<Data>
303 ): void
304
4a6952ff 305 /**
bdede008 306 * Registers a listener callback on a given worker.
4a6952ff 307 *
11df3590
JB
308 * @param worker The worker which should register a listener.
309 * @param listener The message listener callback.
4a6952ff
JB
310 */
311 protected abstract registerWorkerMessageListener<
4f7fa42a
S
312 Message extends Data | Response
313 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 314
280c2a77
S
315 protected internalExecute (
316 worker: Worker,
317 messageId: number
318 ): Promise<Response> {
bf9549ae 319 this.beforePromiseWorkerResponseHook(worker)
be0676b3
APA
320 return new Promise<Response>((resolve, reject) => {
321 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
322 })
323 }
324
729c563d
S
325 /**
326 * Returns a newly created worker.
327 */
280c2a77 328 protected abstract createWorker (): Worker
c97c7edb 329
729c563d
S
330 /**
331 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
332 *
333 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
334 *
335 * @param worker The newly created worker.
336 */
280c2a77 337 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 338
4a6952ff
JB
339 /**
340 * Creates a new worker for this pool and sets it up completely.
341 *
342 * @returns New, completely set up worker.
343 */
344 protected createAndSetupWorker (): Worker {
bdacc2d2 345 const worker = this.createWorker()
280c2a77 346
35cf1c03 347 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
348 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
349 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
350 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 351 worker.once('exit', () => this.removeWorker(worker))
280c2a77 352
c97c7edb 353 this.workers.push(worker)
280c2a77 354
bf9549ae 355 // Init worker tasks usage map
ea7a90d3 356 this.initWorkerTasksUsage(worker)
280c2a77
S
357
358 this.afterWorkerSetup(worker)
359
c97c7edb
S
360 return worker
361 }
be0676b3
APA
362
363 /**
364 * This function is the listener registered for each worker.
365 *
bdacc2d2 366 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
367 */
368 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 369 return message => {
bdacc2d2
JB
370 if (message.id !== undefined) {
371 const promise = this.promiseMap.get(message.id)
372 if (promise !== undefined) {
a05c10de
JB
373 if (message.error) {
374 promise.reject(message.error)
375 } else {
376 promise.resolve(message.data as Response)
377 }
bf9549ae 378 this.afterPromiseWorkerResponseHook(message, promise)
be0676b3
APA
379 this.promiseMap.delete(message.id)
380 }
381 }
382 }
be0676b3 383 }
7c0ba920
JB
384
385 private checkAndEmitBusy (): void {
6bd72cd0 386 if (this.opts.enableEvents === true && this.busy === true) {
7c0ba920
JB
387 this.emitter?.emit('busy')
388 }
389 }
bf9549ae
JB
390
391 /**
10fcfaf4 392 * Increases the number of tasks that the given worker has applied.
bf9549ae
JB
393 *
394 * @param worker Worker which running tasks is increased.
395 */
396 private increaseWorkerRunningTasks (worker: Worker): void {
397 this.stepWorkerRunningTasks(worker, 1)
398 }
399
400 /**
10fcfaf4 401 * Decreases the number of tasks that the given worker has applied.
bf9549ae
JB
402 *
403 * @param worker Worker which running tasks is decreased.
404 */
405 private decreaseWorkerRunningTasks (worker: Worker): void {
406 this.stepWorkerRunningTasks(worker, -1)
407 }
408
409 /**
10fcfaf4 410 * Steps the number of tasks that the given worker has applied.
bf9549ae
JB
411 *
412 * @param worker Worker which running tasks are stepped.
413 * @param step Number of running tasks step.
414 */
415 private stepWorkerRunningTasks (worker: Worker, step: number): void {
a05c10de
JB
416 if (this.checkWorkerTasksUsage(worker) === true) {
417 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
bf9549ae
JB
418 tasksUsage.running = tasksUsage.running + step
419 this.workersTasksUsage.set(worker, tasksUsage)
bf9549ae
JB
420 }
421 }
422
423 /**
10fcfaf4 424 * Steps the number of tasks that the given worker has run.
bf9549ae
JB
425 *
426 * @param worker Worker which has run tasks.
427 * @param step Number of run tasks step.
428 */
10fcfaf4 429 private stepWorkerRunTasks (worker: Worker, step: number): void {
a05c10de
JB
430 if (this.checkWorkerTasksUsage(worker) === true) {
431 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
bf9549ae
JB
432 tasksUsage.run = tasksUsage.run + step
433 this.workersTasksUsage.set(worker, tasksUsage)
bf9549ae
JB
434 }
435 }
436
437 /**
23135a89 438 * Updates tasks runtime for the given worker.
bf9549ae
JB
439 *
440 * @param worker Worker which run the task.
23135a89 441 * @param taskRunTime Worker task runtime.
bf9549ae
JB
442 */
443 private updateWorkerTasksRunTime (
444 worker: Worker,
445 taskRunTime: number | undefined
10fcfaf4
JB
446 ): void {
447 if (
448 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
a05c10de
JB
449 .requiredStatistics.runTime === true &&
450 this.checkWorkerTasksUsage(worker) === true
10fcfaf4 451 ) {
a05c10de
JB
452 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
453 tasksUsage.runTime += taskRunTime ?? 0
454 if (tasksUsage.run !== 0) {
455 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
10fcfaf4 456 }
a05c10de
JB
457 this.workersTasksUsage.set(worker, tasksUsage)
458 }
459 }
460
461 /**
462 * Checks if the given worker is registered in the workers tasks usage map.
463 *
464 * @param worker Worker to check.
465 * @returns `true` if the worker is registered in the workers tasks usage map. `false` otherwise.
466 */
467 private checkWorkerTasksUsage (worker: Worker): boolean {
468 const hasTasksUsage = this.workersTasksUsage.has(worker)
469 if (hasTasksUsage === false) {
470 throw new Error('Worker could not be found in workers tasks usage map')
bf9549ae 471 }
a05c10de 472 return hasTasksUsage
bf9549ae
JB
473 }
474
ea7a90d3
JB
475 /**
476 * Initializes tasks usage statistics.
477 *
478 * @param worker The worker.
479 */
480 initWorkerTasksUsage (worker: Worker): void {
481 this.workersTasksUsage.set(worker, {
482 run: 0,
483 running: 0,
484 runTime: 0,
485 avgRunTime: 0
486 })
487 }
488
bf9549ae 489 /**
10fcfaf4 490 * Removes worker tasks usage statistics.
bf9549ae
JB
491 *
492 * @param worker The worker.
493 */
10fcfaf4 494 private removeWorkerTasksUsage (worker: Worker): void {
bf9549ae
JB
495 this.workersTasksUsage.delete(worker)
496 }
ea7a90d3
JB
497
498 /**
499 * Resets worker tasks usage statistics.
500 *
501 * @param worker The worker.
502 */
503 private resetWorkerTasksUsage (worker: Worker): void {
504 this.removeWorkerTasksUsage(worker)
505 this.initWorkerTasksUsage(worker)
506 }
c97c7edb 507}