Fix documentation generation
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import { EMPTY_FUNCTION } from '../utils'
6 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
7 import type { PoolOptions } from './pool'
8 import { PoolEmitter } from './pool'
9 import type { IPoolInternal, TasksUsage } from './pool-internal'
10 import { PoolType } from './pool-internal'
11 import type { IPoolWorker } from './pool-worker'
12 import {
13 WorkerChoiceStrategies,
14 WorkerChoiceStrategy
15 } from './selection-strategies/selection-strategies-types'
16 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
17
18 /**
19 * Base class that implements some shared logic for all poolifier pools.
20 *
21 * @template Worker Type of worker which manages this pool.
22 * @template Data Type of data sent to the worker. This can only be serializable data.
23 * @template Response Type of response of execution. This can only be serializable data.
24 */
25 export abstract class AbstractPool<
26 Worker extends IPoolWorker,
27 Data = unknown,
28 Response = unknown
29 > implements IPoolInternal<Worker, Data, Response> {
30 /** @inheritDoc */
31 public readonly workers: Worker[] = []
32
33 /** @inheritDoc */
34 public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
35 Worker,
36 TasksUsage
37 >()
38
39 /** @inheritDoc */
40 public readonly emitter?: PoolEmitter
41
42 /** @inheritDoc */
43 public readonly max?: number
44
45 /**
46 * The promise map.
47 *
48 * - `key`: This is the message Id of each submitted task.
49 * - `value`: An object that contains the worker, the resolve function and the reject function.
50 *
51 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
52 */
53 protected promiseMap: Map<
54 number,
55 PromiseWorkerResponseWrapper<Worker, Response>
56 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
57
58 /**
59 * Id of the next message.
60 */
61 protected nextMessageId: number = 0
62
63 /**
64 * Worker choice strategy instance implementing the worker choice algorithm.
65 *
66 * Default to a strategy implementing a round robin algorithm.
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
69 Worker,
70 Data,
71 Response
72 >
73
74 /**
75 * Constructs a new poolifier pool.
76 *
77 * @param numberOfWorkers Number of workers that this pool should manage.
78 * @param filePath Path to the worker-file.
79 * @param opts Options for the pool.
80 */
81 public constructor (
82 public readonly numberOfWorkers: number,
83 public readonly filePath: string,
84 public readonly opts: PoolOptions<Worker>
85 ) {
86 if (this.isMain() === false) {
87 throw new Error('Cannot start a pool from a worker!')
88 }
89 this.checkNumberOfWorkers(this.numberOfWorkers)
90 this.checkFilePath(this.filePath)
91 this.checkPoolOptions(this.opts)
92 this.setupHook()
93
94 for (let i = 1; i <= this.numberOfWorkers; i++) {
95 this.createAndSetupWorker()
96 }
97
98 if (this.opts.enableEvents === true) {
99 this.emitter = new PoolEmitter()
100 }
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
102 this,
103 () => {
104 const workerCreated = this.createAndSetupWorker()
105 this.registerWorkerMessageListener(workerCreated, message => {
106 if (
107 isKillBehavior(KillBehaviors.HARD, message.kill) ||
108 this.getWorkerRunningTasks(workerCreated) === 0
109 ) {
110 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
111 this.destroyWorker(workerCreated) as void
112 }
113 })
114 return workerCreated
115 },
116 this.opts.workerChoiceStrategy
117 )
118 }
119
120 private checkFilePath (filePath: string): void {
121 if (!filePath) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
131 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
132 throw new Error(
133 'Cannot instantiate a pool with a non integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new Error(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
139 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
145 this.opts.workerChoiceStrategy =
146 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
147 this.opts.enableEvents = opts.enableEvents ?? true
148 }
149
150 /** @inheritDoc */
151 public abstract get type (): PoolType
152
153 /** @inheritDoc */
154 public get numberOfRunningTasks (): number {
155 return this.promiseMap.size
156 }
157
158 /** @inheritDoc */
159 public getWorkerIndex (worker: Worker): number {
160 return this.workers.indexOf(worker)
161 }
162
163 /** @inheritDoc */
164 public getWorkerRunningTasks (worker: Worker): number | undefined {
165 return this.workersTasksUsage.get(worker)?.running
166 }
167
168 /** @inheritDoc */
169 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
170 return this.workersTasksUsage.get(worker)?.avgRunTime
171 }
172
173 /** @inheritDoc */
174 public setWorkerChoiceStrategy (
175 workerChoiceStrategy: WorkerChoiceStrategy
176 ): void {
177 this.opts.workerChoiceStrategy = workerChoiceStrategy
178 for (const worker of this.workers) {
179 this.resetWorkerTasksUsage(worker)
180 }
181 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
182 workerChoiceStrategy
183 )
184 }
185
186 /** @inheritDoc */
187 public abstract get busy (): boolean
188
189 protected internalGetBusyStatus (): boolean {
190 return (
191 this.numberOfRunningTasks >= this.numberOfWorkers &&
192 this.findFreeWorker() === false
193 )
194 }
195
196 /** @inheritDoc */
197 public findFreeWorker (): Worker | false {
198 for (const worker of this.workers) {
199 if (this.getWorkerRunningTasks(worker) === 0) {
200 // A worker is free, return the matching worker
201 return worker
202 }
203 }
204 return false
205 }
206
207 /** @inheritDoc */
208 public execute (data: Data): Promise<Response> {
209 // Configure worker to handle message with the specified task
210 const worker = this.chooseWorker()
211 const res = this.internalExecute(worker, this.nextMessageId)
212 this.checkAndEmitBusy()
213 this.sendToWorker(worker, {
214 data: data ?? ({} as Data),
215 id: this.nextMessageId
216 })
217 ++this.nextMessageId
218 return res
219 }
220
221 /** @inheritDoc */
222 public async destroy (): Promise<void> {
223 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
224 }
225
226 /**
227 * Shutdowns given worker.
228 *
229 * @param worker A worker within `workers`.
230 */
231 protected abstract destroyWorker (worker: Worker): void | Promise<void>
232
233 /**
234 * Setup hook that can be overridden by a Poolifier pool implementation
235 * to run code before workers are created in the abstract constructor.
236 */
237 protected setupHook (): void {
238 // Can be overridden
239 }
240
241 /**
242 * Should return whether the worker is the main worker or not.
243 */
244 protected abstract isMain (): boolean
245
246 /**
247 * Hook executed before the worker task promise resolution.
248 * Can be overridden.
249 *
250 * @param worker The worker.
251 */
252 protected beforePromiseWorkerResponseHook (worker: Worker): void {
253 this.increaseWorkerRunningTasks(worker)
254 }
255
256 /**
257 * Hook executed after the worker task promise resolution.
258 * Can be overridden.
259 *
260 * @param message The received message.
261 * @param promise The Promise response.
262 */
263 protected afterPromiseWorkerResponseHook (
264 message: MessageValue<Response>,
265 promise: PromiseWorkerResponseWrapper<Worker, Response>
266 ): void {
267 this.decreaseWorkerRunningTasks(promise.worker)
268 this.stepWorkerRunTasks(promise.worker, 1)
269 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
270 }
271
272 /**
273 * Removes the given worker from the pool.
274 *
275 * @param worker The worker that will be removed.
276 */
277 protected removeWorker (worker: Worker): void {
278 // Clean worker from data structure
279 this.workers.splice(this.getWorkerIndex(worker), 1)
280 this.removeWorkerTasksUsage(worker)
281 }
282
283 /**
284 * Chooses a worker for the next task.
285 *
286 * The default implementation uses a round robin algorithm to distribute the load.
287 *
288 * @returns Worker.
289 */
290 protected chooseWorker (): Worker {
291 return this.workerChoiceStrategyContext.execute()
292 }
293
294 /**
295 * Sends a message to the given worker.
296 *
297 * @param worker The worker which should receive the message.
298 * @param message The message.
299 */
300 protected abstract sendToWorker (
301 worker: Worker,
302 message: MessageValue<Data>
303 ): void
304
305 /**
306 * Registers a listener callback on a given worker.
307 *
308 * @param worker The worker which should register a listener.
309 * @param listener The message listener callback.
310 */
311 protected abstract registerWorkerMessageListener<
312 Message extends Data | Response
313 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
314
315 protected internalExecute (
316 worker: Worker,
317 messageId: number
318 ): Promise<Response> {
319 this.beforePromiseWorkerResponseHook(worker)
320 return new Promise<Response>((resolve, reject) => {
321 this.promiseMap.set(messageId, { resolve, reject, worker })
322 })
323 }
324
325 /**
326 * Returns a newly created worker.
327 */
328 protected abstract createWorker (): Worker
329
330 /**
331 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
332 *
333 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
334 *
335 * @param worker The newly created worker.
336 */
337 protected abstract afterWorkerSetup (worker: Worker): void
338
339 /**
340 * Creates a new worker for this pool and sets it up completely.
341 *
342 * @returns New, completely set up worker.
343 */
344 protected createAndSetupWorker (): Worker {
345 const worker = this.createWorker()
346
347 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
348 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
349 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
350 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
351 worker.once('exit', () => this.removeWorker(worker))
352
353 this.workers.push(worker)
354
355 // Init worker tasks usage map
356 this.initWorkerTasksUsage(worker)
357
358 this.afterWorkerSetup(worker)
359
360 return worker
361 }
362
363 /**
364 * This function is the listener registered for each worker.
365 *
366 * @returns The listener function to execute when a message is received from a worker.
367 */
368 protected workerListener (): (message: MessageValue<Response>) => void {
369 return message => {
370 if (message.id !== undefined) {
371 const promise = this.promiseMap.get(message.id)
372 if (promise !== undefined) {
373 if (message.error) {
374 promise.reject(message.error)
375 } else {
376 promise.resolve(message.data as Response)
377 }
378 this.afterPromiseWorkerResponseHook(message, promise)
379 this.promiseMap.delete(message.id)
380 }
381 }
382 }
383 }
384
385 private checkAndEmitBusy (): void {
386 if (this.opts.enableEvents === true && this.busy === true) {
387 this.emitter?.emit('busy')
388 }
389 }
390
391 /**
392 * Increases the number of tasks that the given worker has applied.
393 *
394 * @param worker Worker which running tasks is increased.
395 */
396 private increaseWorkerRunningTasks (worker: Worker): void {
397 this.stepWorkerRunningTasks(worker, 1)
398 }
399
400 /**
401 * Decreases the number of tasks that the given worker has applied.
402 *
403 * @param worker Worker which running tasks is decreased.
404 */
405 private decreaseWorkerRunningTasks (worker: Worker): void {
406 this.stepWorkerRunningTasks(worker, -1)
407 }
408
409 /**
410 * Steps the number of tasks that the given worker has applied.
411 *
412 * @param worker Worker which running tasks are stepped.
413 * @param step Number of running tasks step.
414 */
415 private stepWorkerRunningTasks (worker: Worker, step: number): void {
416 if (this.checkWorkerTasksUsage(worker) === true) {
417 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
418 tasksUsage.running = tasksUsage.running + step
419 this.workersTasksUsage.set(worker, tasksUsage)
420 }
421 }
422
423 /**
424 * Steps the number of tasks that the given worker has run.
425 *
426 * @param worker Worker which has run tasks.
427 * @param step Number of run tasks step.
428 */
429 private stepWorkerRunTasks (worker: Worker, step: number): void {
430 if (this.checkWorkerTasksUsage(worker) === true) {
431 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
432 tasksUsage.run = tasksUsage.run + step
433 this.workersTasksUsage.set(worker, tasksUsage)
434 }
435 }
436
437 /**
438 * Updates tasks runtime for the given worker.
439 *
440 * @param worker Worker which run the task.
441 * @param taskRunTime Worker task runtime.
442 */
443 private updateWorkerTasksRunTime (
444 worker: Worker,
445 taskRunTime: number | undefined
446 ): void {
447 if (
448 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
449 .requiredStatistics.runTime === true &&
450 this.checkWorkerTasksUsage(worker) === true
451 ) {
452 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
453 tasksUsage.runTime += taskRunTime ?? 0
454 if (tasksUsage.run !== 0) {
455 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
456 }
457 this.workersTasksUsage.set(worker, tasksUsage)
458 }
459 }
460
461 /**
462 * Checks if the given worker is registered in the workers tasks usage map.
463 *
464 * @param worker Worker to check.
465 * @returns `true` if the worker is registered in the workers tasks usage map. `false` otherwise.
466 */
467 private checkWorkerTasksUsage (worker: Worker): boolean {
468 const hasTasksUsage = this.workersTasksUsage.has(worker)
469 if (hasTasksUsage === false) {
470 throw new Error('Worker could not be found in workers tasks usage map')
471 }
472 return hasTasksUsage
473 }
474
475 /**
476 * Initializes tasks usage statistics.
477 *
478 * @param worker The worker.
479 */
480 initWorkerTasksUsage (worker: Worker): void {
481 this.workersTasksUsage.set(worker, {
482 run: 0,
483 running: 0,
484 runTime: 0,
485 avgRunTime: 0
486 })
487 }
488
489 /**
490 * Removes worker tasks usage statistics.
491 *
492 * @param worker The worker.
493 */
494 private removeWorkerTasksUsage (worker: Worker): void {
495 this.workersTasksUsage.delete(worker)
496 }
497
498 /**
499 * Resets worker tasks usage statistics.
500 *
501 * @param worker The worker.
502 */
503 private resetWorkerTasksUsage (worker: Worker): void {
504 this.removeWorkerTasksUsage(worker)
505 this.initWorkerTasksUsage(worker)
506 }
507 }