refactor: prepare the code to handle task abstraction at execute
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3 import { EMPTY_FUNCTION, median } from '../utils'
4 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
5 import { PoolEvents, type PoolOptions } from './pool'
6 import { PoolEmitter } from './pool'
7 import type { IPoolInternal } from './pool-internal'
8 import { PoolType } from './pool-internal'
9 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
10 import {
11 WorkerChoiceStrategies,
12 type WorkerChoiceStrategy
13 } from './selection-strategies/selection-strategies-types'
14 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
15 import { CircularArray } from '../circular-array'
16
17 /**
18 * Base class that implements some shared logic for all poolifier pools.
19 *
20 * @typeParam Worker - Type of worker which manages this pool.
21 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
22 * @typeParam Response - Type of response of execution. This can only be serializable data.
23 */
24 export abstract class AbstractPool<
25 Worker extends IWorker,
26 Data = unknown,
27 Response = unknown
28 > implements IPoolInternal<Worker, Data, Response> {
29 /** @inheritDoc */
30 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
31
32 /** @inheritDoc */
33 public readonly emitter?: PoolEmitter
34
35 /**
36 * The promise response map.
37 *
38 * - `key`: The message id of each submitted task.
39 * - `value`: An object that contains the worker, the promise resolve and reject callbacks.
40 *
41 * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message.
42 */
43 protected promiseResponseMap: Map<
44 string,
45 PromiseResponseWrapper<Worker, Response>
46 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
47
48 /**
49 * Worker choice strategy context referencing a worker choice algorithm implementation.
50 *
51 * Default to a round robin algorithm.
52 */
53 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
54 Worker,
55 Data,
56 Response
57 >
58
59 /**
60 * Constructs a new poolifier pool.
61 *
62 * @param numberOfWorkers - Number of workers that this pool should manage.
63 * @param filePath - Path to the worker-file.
64 * @param opts - Options for the pool.
65 */
66 public constructor (
67 public readonly numberOfWorkers: number,
68 public readonly filePath: string,
69 public readonly opts: PoolOptions<Worker>
70 ) {
71 if (!this.isMain()) {
72 throw new Error('Cannot start a pool from a worker!')
73 }
74 this.checkNumberOfWorkers(this.numberOfWorkers)
75 this.checkFilePath(this.filePath)
76 this.checkPoolOptions(this.opts)
77
78 this.chooseWorkerNode.bind(this)
79 this.internalExecute.bind(this)
80 this.checkAndEmitFull.bind(this)
81 this.checkAndEmitBusy.bind(this)
82 this.sendToWorker.bind(this)
83
84 this.setupHook()
85
86 for (let i = 1; i <= this.numberOfWorkers; i++) {
87 this.createAndSetupWorker()
88 }
89
90 if (this.opts.enableEvents === true) {
91 this.emitter = new PoolEmitter()
92 }
93 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
94 Worker,
95 Data,
96 Response
97 >(this, this.opts.workerChoiceStrategy)
98 }
99
100 private checkFilePath (filePath: string): void {
101 if (
102 filePath == null ||
103 (typeof filePath === 'string' && filePath.trim().length === 0)
104 ) {
105 throw new Error('Please specify a file with a worker implementation')
106 }
107 }
108
109 private checkNumberOfWorkers (numberOfWorkers: number): void {
110 if (numberOfWorkers == null) {
111 throw new Error(
112 'Cannot instantiate a pool without specifying the number of workers'
113 )
114 } else if (!Number.isSafeInteger(numberOfWorkers)) {
115 throw new TypeError(
116 'Cannot instantiate a pool with a non integer number of workers'
117 )
118 } else if (numberOfWorkers < 0) {
119 throw new RangeError(
120 'Cannot instantiate a pool with a negative number of workers'
121 )
122 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
123 throw new Error('Cannot instantiate a fixed pool with no worker')
124 }
125 }
126
127 private checkPoolOptions (opts: PoolOptions<Worker>): void {
128 this.opts.workerChoiceStrategy =
129 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
130 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
131 this.opts.enableEvents = opts.enableEvents ?? true
132 }
133
134 private checkValidWorkerChoiceStrategy (
135 workerChoiceStrategy: WorkerChoiceStrategy
136 ): void {
137 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
138 throw new Error(
139 `Invalid worker choice strategy '${workerChoiceStrategy}'`
140 )
141 }
142 }
143
144 /** @inheritDoc */
145 public abstract get type (): PoolType
146
147 /**
148 * Number of tasks concurrently running in the pool.
149 */
150 private get numberOfRunningTasks (): number {
151 return this.promiseResponseMap.size
152 }
153
154 /**
155 * Gets the given worker its worker node key.
156 *
157 * @param worker - The worker.
158 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
159 */
160 private getWorkerNodeKey (worker: Worker): number {
161 return this.workerNodes.findIndex(
162 workerNode => workerNode.worker === worker
163 )
164 }
165
166 /** @inheritDoc */
167 public setWorkerChoiceStrategy (
168 workerChoiceStrategy: WorkerChoiceStrategy
169 ): void {
170 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
171 this.opts.workerChoiceStrategy = workerChoiceStrategy
172 for (const [index, workerNode] of this.workerNodes.entries()) {
173 this.setWorkerNode(
174 index,
175 workerNode.worker,
176 {
177 run: 0,
178 running: 0,
179 runTime: 0,
180 runTimeHistory: new CircularArray(),
181 avgRunTime: 0,
182 medRunTime: 0,
183 error: 0
184 },
185 workerNode.tasksQueue
186 )
187 }
188 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
189 workerChoiceStrategy
190 )
191 }
192
193 /** @inheritDoc */
194 public abstract get full (): boolean
195
196 /** @inheritDoc */
197 public abstract get busy (): boolean
198
199 protected internalBusy (): boolean {
200 return (
201 this.numberOfRunningTasks >= this.numberOfWorkers &&
202 this.findFreeWorkerNodeKey() === -1
203 )
204 }
205
206 /** @inheritDoc */
207 public findFreeWorkerNodeKey (): number {
208 return this.workerNodes.findIndex(workerNode => {
209 return workerNode.tasksUsage?.running === 0
210 })
211 }
212
213 /** @inheritDoc */
214 public async execute (data: Data): Promise<Response> {
215 const [workerNodeKey, workerNode] = this.chooseWorkerNode()
216 const submittedTask: Task<Data> = {
217 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
218 data: data ?? ({} as Data),
219 id: crypto.randomUUID()
220 }
221 const res = this.internalExecute(workerNodeKey, workerNode, submittedTask)
222 let currentTask: Task<Data>
223 // FIXME: Add sensible conditions to start tasks queuing on the worker node.
224 if (this.tasksQueueLength(workerNodeKey) > 0) {
225 currentTask = this.dequeueTask(workerNodeKey) as Task<Data>
226 this.enqueueTask(workerNodeKey, submittedTask)
227 } else {
228 currentTask = submittedTask
229 }
230 this.sendToWorker(workerNode.worker, currentTask)
231 this.checkAndEmitFull()
232 this.checkAndEmitBusy()
233 // eslint-disable-next-line @typescript-eslint/return-await
234 return res
235 }
236
237 /** @inheritDoc */
238 public async destroy (): Promise<void> {
239 await Promise.all(
240 this.workerNodes.map(async workerNode => {
241 await this.destroyWorker(workerNode.worker)
242 })
243 )
244 }
245
246 /**
247 * Shutdowns the given worker.
248 *
249 * @param worker - A worker within `workerNodes`.
250 */
251 protected abstract destroyWorker (worker: Worker): void | Promise<void>
252
253 /**
254 * Setup hook to run code before worker node are created in the abstract constructor.
255 * Can be overridden
256 *
257 * @virtual
258 */
259 protected setupHook (): void {
260 // Intentionally empty
261 }
262
263 /**
264 * Should return whether the worker is the main worker or not.
265 */
266 protected abstract isMain (): boolean
267
268 /**
269 * Hook executed before the worker task promise resolution.
270 * Can be overridden.
271 *
272 * @param workerNodeKey - The worker node key.
273 */
274 protected beforePromiseResponseHook (workerNodeKey: number): void {
275 ++this.workerNodes[workerNodeKey].tasksUsage.running
276 }
277
278 /**
279 * Hook executed after the worker task promise resolution.
280 * Can be overridden.
281 *
282 * @param worker - The worker.
283 * @param message - The received message.
284 */
285 protected afterPromiseResponseHook (
286 worker: Worker,
287 message: MessageValue<Response>
288 ): void {
289 const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
290 --workerTasksUsage.running
291 ++workerTasksUsage.run
292 if (message.error != null) {
293 ++workerTasksUsage.error
294 }
295 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
296 workerTasksUsage.runTime += message.runTime ?? 0
297 if (
298 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
299 workerTasksUsage.run !== 0
300 ) {
301 workerTasksUsage.avgRunTime =
302 workerTasksUsage.runTime / workerTasksUsage.run
303 }
304 if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
305 workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
306 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
307 }
308 }
309 }
310
311 /**
312 * Chooses a worker node for the next task.
313 *
314 * The default uses a round robin algorithm to distribute the load.
315 *
316 * @returns [worker node key, worker node].
317 */
318 protected chooseWorkerNode (): [number, WorkerNode<Worker, Data>] {
319 let workerNodeKey: number
320 if (
321 this.type === PoolType.DYNAMIC &&
322 !this.full &&
323 this.findFreeWorkerNodeKey() === -1
324 ) {
325 const workerCreated = this.createAndSetupWorker()
326 this.registerWorkerMessageListener(workerCreated, message => {
327 if (
328 isKillBehavior(KillBehaviors.HARD, message.kill) ||
329 (message.kill != null &&
330 this.getWorkerTasksUsage(workerCreated)?.running === 0)
331 ) {
332 // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
333 void this.destroyWorker(workerCreated)
334 }
335 })
336 workerNodeKey = this.getWorkerNodeKey(workerCreated)
337 } else {
338 workerNodeKey = this.workerChoiceStrategyContext.execute()
339 }
340 return [workerNodeKey, this.workerNodes[workerNodeKey]]
341 }
342
343 /**
344 * Sends a message to the given worker.
345 *
346 * @param worker - The worker which should receive the message.
347 * @param message - The message.
348 */
349 protected abstract sendToWorker (
350 worker: Worker,
351 message: MessageValue<Data>
352 ): void
353
354 /**
355 * Registers a listener callback on the given worker.
356 *
357 * @param worker - The worker which should register a listener.
358 * @param listener - The message listener callback.
359 */
360 protected abstract registerWorkerMessageListener<
361 Message extends Data | Response
362 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
363
364 /**
365 * Returns a newly created worker.
366 */
367 protected abstract createWorker (): Worker
368
369 /**
370 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
371 *
372 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
373 *
374 * @param worker - The newly created worker.
375 */
376 protected abstract afterWorkerSetup (worker: Worker): void
377
378 /**
379 * Creates a new worker and sets it up completely in the pool worker nodes.
380 *
381 * @returns New, completely set up worker.
382 */
383 protected createAndSetupWorker (): Worker {
384 const worker = this.createWorker()
385
386 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
387 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
388 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
389 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
390 worker.once('exit', () => {
391 this.removeWorkerNode(worker)
392 })
393
394 this.pushWorkerNode(worker)
395
396 this.afterWorkerSetup(worker)
397
398 return worker
399 }
400
401 /**
402 * This function is the listener registered for each worker.
403 *
404 * @returns The listener function to execute when a message is received from a worker.
405 */
406 protected workerListener (): (message: MessageValue<Response>) => void {
407 return message => {
408 if (message.id != null) {
409 // Task response received
410 const promiseResponse = this.promiseResponseMap.get(message.id)
411 if (promiseResponse != null) {
412 if (message.error != null) {
413 promiseResponse.reject(message.error)
414 } else {
415 promiseResponse.resolve(message.data as Response)
416 }
417 this.afterPromiseResponseHook(promiseResponse.worker, message)
418 this.promiseResponseMap.delete(message.id)
419 }
420 }
421 }
422 }
423
424 private async internalExecute (
425 workerNodeKey: number,
426 workerNode: WorkerNode<Worker, Data>,
427 task: Task<Data>
428 ): Promise<Response> {
429 this.beforePromiseResponseHook(workerNodeKey)
430 return await new Promise<Response>((resolve, reject) => {
431 this.promiseResponseMap.set(task.id, {
432 resolve,
433 reject,
434 worker: workerNode.worker
435 })
436 })
437 }
438
439 private checkAndEmitBusy (): void {
440 if (this.opts.enableEvents === true && this.busy) {
441 this.emitter?.emit(PoolEvents.busy)
442 }
443 }
444
445 private checkAndEmitFull (): void {
446 if (
447 this.type === PoolType.DYNAMIC &&
448 this.opts.enableEvents === true &&
449 this.full
450 ) {
451 this.emitter?.emit(PoolEvents.full)
452 }
453 }
454
455 /**
456 * Gets the given worker its tasks usage in the pool.
457 *
458 * @param worker - The worker.
459 * @returns The worker tasks usage.
460 */
461 private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
462 const workerNodeKey = this.getWorkerNodeKey(worker)
463 if (workerNodeKey !== -1) {
464 return this.workerNodes[workerNodeKey].tasksUsage
465 }
466 throw new Error('Worker could not be found in the pool worker nodes')
467 }
468
469 /**
470 * Pushes the given worker in the pool worker nodes.
471 *
472 * @param worker - The worker.
473 * @returns The worker nodes length.
474 */
475 private pushWorkerNode (worker: Worker): number {
476 return this.workerNodes.push({
477 worker,
478 tasksUsage: {
479 run: 0,
480 running: 0,
481 runTime: 0,
482 runTimeHistory: new CircularArray(),
483 avgRunTime: 0,
484 medRunTime: 0,
485 error: 0
486 },
487 tasksQueue: []
488 })
489 }
490
491 /**
492 * Sets the given worker in the pool worker nodes.
493 *
494 * @param workerNodeKey - The worker node key.
495 * @param worker - The worker.
496 * @param tasksUsage - The worker tasks usage.
497 * @param tasksQueue - The worker task queue.
498 */
499 private setWorkerNode (
500 workerNodeKey: number,
501 worker: Worker,
502 tasksUsage: TasksUsage,
503 tasksQueue: Array<Task<Data>>
504 ): void {
505 this.workerNodes[workerNodeKey] = {
506 worker,
507 tasksUsage,
508 tasksQueue
509 }
510 }
511
512 /**
513 * Removes the given worker from the pool worker nodes.
514 *
515 * @param worker - The worker.
516 */
517 protected removeWorkerNode (worker: Worker): void {
518 const workerNodeKey = this.getWorkerNodeKey(worker)
519 this.workerNodes.splice(workerNodeKey, 1)
520 this.workerChoiceStrategyContext.remove(workerNodeKey)
521 }
522
523 protected enqueueTask (workerNodeKey: number, task: Task<Data>): void {
524 this.workerNodes[workerNodeKey].tasksQueue.push(task)
525 }
526
527 protected dequeueTask (workerNodeKey: number): Task<Data> | undefined {
528 return this.workerNodes[workerNodeKey].tasksQueue.shift()
529 }
530
531 protected tasksQueueLength (workerNodeKey: number): number {
532 return this.workerNodes[workerNodeKey].tasksQueue.length
533 }
534 }