75b5cf796c8a6c198c48bb8ad843bd6eae33ba29
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3 import { EMPTY_FUNCTION, median } from '../utils'
4 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
5 import { PoolEvents, type PoolOptions } from './pool'
6 import { PoolEmitter } from './pool'
7 import type { IPoolInternal, TasksUsage, WorkerType } from './pool-internal'
8 import { PoolType } from './pool-internal'
9 import type { IPoolWorker } from './pool-worker'
10 import {
11 WorkerChoiceStrategies,
12 type WorkerChoiceStrategy
13 } from './selection-strategies/selection-strategies-types'
14 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
15 import { CircularArray } from '../circular-array'
16
17 /**
18 * Base class that implements some shared logic for all poolifier pools.
19 *
20 * @typeParam Worker - Type of worker which manages this pool.
21 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
22 * @typeParam Response - Type of response of execution. This can only be serializable data.
23 */
24 export abstract class AbstractPool<
25 Worker extends IPoolWorker,
26 Data = unknown,
27 Response = unknown
28 > implements IPoolInternal<Worker, Data, Response> {
29 /** @inheritDoc */
30 public readonly workers: Array<WorkerType<Worker>> = []
31
32 /** @inheritDoc */
33 public readonly emitter?: PoolEmitter
34
35 /**
36 * The promise response map.
37 *
38 * - `key`: The message id of each submitted task.
39 * - `value`: An object that contains the worker, the promise resolve and reject callbacks.
40 *
41 * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message.
42 */
43 protected promiseResponseMap: Map<
44 string,
45 PromiseResponseWrapper<Worker, Response>
46 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
47
48 /**
49 * Worker choice strategy context referencing a worker choice algorithm implementation.
50 *
51 * Default to a round robin algorithm.
52 */
53 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
54 Worker,
55 Data,
56 Response
57 >
58
59 /**
60 * Constructs a new poolifier pool.
61 *
62 * @param numberOfWorkers - Number of workers that this pool should manage.
63 * @param filePath - Path to the worker-file.
64 * @param opts - Options for the pool.
65 */
66 public constructor (
67 public readonly numberOfWorkers: number,
68 public readonly filePath: string,
69 public readonly opts: PoolOptions<Worker>
70 ) {
71 if (!this.isMain()) {
72 throw new Error('Cannot start a pool from a worker!')
73 }
74 this.checkNumberOfWorkers(this.numberOfWorkers)
75 this.checkFilePath(this.filePath)
76 this.checkPoolOptions(this.opts)
77
78 this.chooseWorker.bind(this)
79 this.internalExecute.bind(this)
80 this.checkAndEmitFull.bind(this)
81 this.checkAndEmitBusy.bind(this)
82 this.sendToWorker.bind(this)
83
84 this.setupHook()
85
86 for (let i = 1; i <= this.numberOfWorkers; i++) {
87 this.createAndSetupWorker()
88 }
89
90 if (this.opts.enableEvents === true) {
91 this.emitter = new PoolEmitter()
92 }
93 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
94 Worker,
95 Data,
96 Response
97 >(this, this.opts.workerChoiceStrategy)
98 }
99
100 private checkFilePath (filePath: string): void {
101 if (
102 filePath == null ||
103 (typeof filePath === 'string' && filePath.trim().length === 0)
104 ) {
105 throw new Error('Please specify a file with a worker implementation')
106 }
107 }
108
109 private checkNumberOfWorkers (numberOfWorkers: number): void {
110 if (numberOfWorkers == null) {
111 throw new Error(
112 'Cannot instantiate a pool without specifying the number of workers'
113 )
114 } else if (!Number.isSafeInteger(numberOfWorkers)) {
115 throw new TypeError(
116 'Cannot instantiate a pool with a non integer number of workers'
117 )
118 } else if (numberOfWorkers < 0) {
119 throw new RangeError(
120 'Cannot instantiate a pool with a negative number of workers'
121 )
122 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
123 throw new Error('Cannot instantiate a fixed pool with no worker')
124 }
125 }
126
127 private checkPoolOptions (opts: PoolOptions<Worker>): void {
128 this.opts.workerChoiceStrategy =
129 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
130 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
131 this.opts.enableEvents = opts.enableEvents ?? true
132 }
133
134 private checkValidWorkerChoiceStrategy (
135 workerChoiceStrategy: WorkerChoiceStrategy
136 ): void {
137 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
138 throw new Error(
139 `Invalid worker choice strategy '${workerChoiceStrategy}'`
140 )
141 }
142 }
143
144 /** @inheritDoc */
145 public abstract get type (): PoolType
146
147 /**
148 * Number of tasks concurrently running in the pool.
149 */
150 private get numberOfRunningTasks (): number {
151 return this.promiseResponseMap.size
152 }
153
154 /**
155 * Gets the given worker key.
156 *
157 * @param worker - The worker.
158 * @returns The worker key if the worker is found in the pool, `-1` otherwise.
159 */
160 private getWorkerKey (worker: Worker): number {
161 return this.workers.findIndex(workerItem => workerItem.worker === worker)
162 }
163
164 /** @inheritDoc */
165 public setWorkerChoiceStrategy (
166 workerChoiceStrategy: WorkerChoiceStrategy
167 ): void {
168 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
169 this.opts.workerChoiceStrategy = workerChoiceStrategy
170 for (const [index, workerItem] of this.workers.entries()) {
171 this.setWorker(index, workerItem.worker, {
172 run: 0,
173 running: 0,
174 runTime: 0,
175 runTimeHistory: new CircularArray(),
176 avgRunTime: 0,
177 medRunTime: 0,
178 error: 0
179 })
180 }
181 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
182 workerChoiceStrategy
183 )
184 }
185
186 /** @inheritDoc */
187 public abstract get full (): boolean
188
189 /** @inheritDoc */
190 public abstract get busy (): boolean
191
192 protected internalBusy (): boolean {
193 return (
194 this.numberOfRunningTasks >= this.numberOfWorkers &&
195 this.findFreeWorkerKey() === -1
196 )
197 }
198
199 /** @inheritDoc */
200 public findFreeWorkerKey (): number {
201 return this.workers.findIndex(workerItem => {
202 return workerItem.tasksUsage.running === 0
203 })
204 }
205
206 /** @inheritDoc */
207 public async execute (data: Data): Promise<Response> {
208 const [workerKey, worker] = this.chooseWorker()
209 const messageId = crypto.randomUUID()
210 const res = this.internalExecute(workerKey, worker, messageId)
211 this.checkAndEmitFull()
212 this.checkAndEmitBusy()
213 this.sendToWorker(worker, {
214 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
215 data: data ?? ({} as Data),
216 id: messageId
217 })
218 // eslint-disable-next-line @typescript-eslint/return-await
219 return res
220 }
221
222 /** @inheritDoc */
223 public async destroy (): Promise<void> {
224 await Promise.all(
225 this.workers.map(async workerItem => {
226 await this.destroyWorker(workerItem.worker)
227 })
228 )
229 }
230
231 /**
232 * Shutdowns given worker in the pool.
233 *
234 * @param worker - A worker within `workers`.
235 */
236 protected abstract destroyWorker (worker: Worker): void | Promise<void>
237
238 /**
239 * Setup hook that can be overridden by a Poolifier pool implementation
240 * to run code before workers are created in the abstract constructor.
241 * Can be overridden
242 *
243 * @virtual
244 */
245 protected setupHook (): void {
246 // Intentionally empty
247 }
248
249 /**
250 * Should return whether the worker is the main worker or not.
251 */
252 protected abstract isMain (): boolean
253
254 /**
255 * Hook executed before the worker task promise resolution.
256 * Can be overridden.
257 *
258 * @param workerKey - The worker key.
259 */
260 protected beforePromiseResponseHook (workerKey: number): void {
261 ++this.workers[workerKey].tasksUsage.running
262 }
263
264 /**
265 * Hook executed after the worker task promise resolution.
266 * Can be overridden.
267 *
268 * @param worker - The worker.
269 * @param message - The received message.
270 */
271 protected afterPromiseResponseHook (
272 worker: Worker,
273 message: MessageValue<Response>
274 ): void {
275 const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
276 --workerTasksUsage.running
277 ++workerTasksUsage.run
278 if (message.error != null) {
279 ++workerTasksUsage.error
280 }
281 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
282 workerTasksUsage.runTime += message.runTime ?? 0
283 if (
284 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
285 workerTasksUsage.run !== 0
286 ) {
287 workerTasksUsage.avgRunTime =
288 workerTasksUsage.runTime / workerTasksUsage.run
289 }
290 if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
291 workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
292 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
293 }
294 }
295 }
296
297 /**
298 * Chooses a worker for the next task.
299 *
300 * The default uses a round robin algorithm to distribute the load.
301 *
302 * @returns [worker key, worker].
303 */
304 protected chooseWorker (): [number, Worker] {
305 let workerKey: number
306 if (
307 this.type === PoolType.DYNAMIC &&
308 !this.full &&
309 this.findFreeWorkerKey() === -1
310 ) {
311 const createdWorker = this.createAndSetupWorker()
312 this.registerWorkerMessageListener(createdWorker, message => {
313 if (
314 isKillBehavior(KillBehaviors.HARD, message.kill) ||
315 (message.kill != null &&
316 this.getWorkerTasksUsage(createdWorker)?.running === 0)
317 ) {
318 // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
319 void this.destroyWorker(createdWorker)
320 }
321 })
322 workerKey = this.getWorkerKey(createdWorker)
323 } else {
324 workerKey = this.workerChoiceStrategyContext.execute()
325 }
326 return [workerKey, this.workers[workerKey].worker]
327 }
328
329 /**
330 * Sends a message to the given worker.
331 *
332 * @param worker - The worker which should receive the message.
333 * @param message - The message.
334 */
335 protected abstract sendToWorker (
336 worker: Worker,
337 message: MessageValue<Data>
338 ): void
339
340 /**
341 * Registers a listener callback on a given worker.
342 *
343 * @param worker - The worker which should register a listener.
344 * @param listener - The message listener callback.
345 */
346 protected abstract registerWorkerMessageListener<
347 Message extends Data | Response
348 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
349
350 /**
351 * Returns a newly created worker.
352 */
353 protected abstract createWorker (): Worker
354
355 /**
356 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
357 *
358 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
359 *
360 * @param worker - The newly created worker.
361 * @virtual
362 */
363 protected abstract afterWorkerSetup (worker: Worker): void
364
365 /**
366 * Creates a new worker for this pool and sets it up completely.
367 *
368 * @returns New, completely set up worker.
369 */
370 protected createAndSetupWorker (): Worker {
371 const worker = this.createWorker()
372
373 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
374 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
375 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
376 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
377 worker.once('exit', () => {
378 this.removeWorker(worker)
379 })
380
381 this.pushWorker(worker, {
382 run: 0,
383 running: 0,
384 runTime: 0,
385 runTimeHistory: new CircularArray(),
386 avgRunTime: 0,
387 medRunTime: 0,
388 error: 0
389 })
390
391 this.afterWorkerSetup(worker)
392
393 return worker
394 }
395
396 /**
397 * This function is the listener registered for each worker.
398 *
399 * @returns The listener function to execute when a message is received from a worker.
400 */
401 protected workerListener (): (message: MessageValue<Response>) => void {
402 return message => {
403 if (message.id != null) {
404 // Task response received
405 const promiseResponse = this.promiseResponseMap.get(message.id)
406 if (promiseResponse != null) {
407 if (message.error != null) {
408 promiseResponse.reject(message.error)
409 } else {
410 promiseResponse.resolve(message.data as Response)
411 }
412 this.afterPromiseResponseHook(promiseResponse.worker, message)
413 this.promiseResponseMap.delete(message.id)
414 }
415 }
416 }
417 }
418
419 private async internalExecute (
420 workerKey: number,
421 worker: Worker,
422 messageId: string
423 ): Promise<Response> {
424 this.beforePromiseResponseHook(workerKey)
425 return await new Promise<Response>((resolve, reject) => {
426 this.promiseResponseMap.set(messageId, { resolve, reject, worker })
427 })
428 }
429
430 private checkAndEmitBusy (): void {
431 if (this.opts.enableEvents === true && this.busy) {
432 this.emitter?.emit(PoolEvents.busy)
433 }
434 }
435
436 private checkAndEmitFull (): void {
437 if (
438 this.type === PoolType.DYNAMIC &&
439 this.opts.enableEvents === true &&
440 this.full
441 ) {
442 this.emitter?.emit(PoolEvents.full)
443 }
444 }
445
446 /**
447 * Gets the given worker tasks usage in the pool.
448 *
449 * @param worker - The worker.
450 * @returns The worker tasks usage.
451 */
452 private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
453 const workerKey = this.getWorkerKey(worker)
454 if (workerKey !== -1) {
455 return this.workers[workerKey].tasksUsage
456 }
457 throw new Error('Worker could not be found in the pool')
458 }
459
460 /**
461 * Pushes the given worker in the pool.
462 *
463 * @param worker - The worker.
464 * @param tasksUsage - The worker tasks usage.
465 */
466 private pushWorker (worker: Worker, tasksUsage: TasksUsage): void {
467 this.workers.push({
468 worker,
469 tasksUsage
470 })
471 }
472
473 /**
474 * Sets the given worker in the pool.
475 *
476 * @param workerKey - The worker key.
477 * @param worker - The worker.
478 * @param tasksUsage - The worker tasks usage.
479 */
480 private setWorker (
481 workerKey: number,
482 worker: Worker,
483 tasksUsage: TasksUsage
484 ): void {
485 this.workers[workerKey] = {
486 worker,
487 tasksUsage
488 }
489 }
490
491 /**
492 * Removes the given worker from the pool.
493 *
494 * @param worker - The worker that will be removed.
495 */
496 protected removeWorker (worker: Worker): void {
497 const workerKey = this.getWorkerKey(worker)
498 this.workers.splice(workerKey, 1)
499 this.workerChoiceStrategyContext.remove(workerKey)
500 }
501 }