X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=6b29669ff624a2ccdc58faebc2faa909125f97a0;hb=1641b313a20f6d9de889498a8f04f2e4fa6ca11f;hp=d169afadb1df863cfbf8f37f1dbd623b211f3b07;hpb=cda5cc74c77bdfc37b220ef19637876e221b5061;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d169afad..6b29669f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,16 +2,33 @@ import EventEmitter from 'events' import type { MessageValue } from '../utility-types' import type { IPool } from './pool' +/** + * Callback invoked if the worker raised an error. + */ export type ErrorHandler = (this: Worker, e: Error) => void + +/** + * Callback invoked when the worker has started successfully. + */ export type OnlineHandler = (this: Worker) => void + +/** + * Callback invoked when the worker exits successfully. + */ export type ExitHandler = (this: Worker, code: number) => void +/** + * Basic interface that describes the minimum required implementation of listener events for a pool-worker. + */ export interface IWorker { on(event: 'error', handler: ErrorHandler): void on(event: 'online', handler: OnlineHandler): void on(event: 'exit', handler: ExitHandler): void } +/** + * Options for a poolifier pool. + */ export interface PoolOptions { /** * A function that will listen for error event on each worker. @@ -26,34 +43,70 @@ export interface PoolOptions { */ exitHandler?: ExitHandler /** - * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). + * This is just to avoid non-useful warning messages. + * + * Will be used to set `maxListeners` on event emitters (workers are event emitters). * * @default 1000 + * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n) */ maxTasks?: number } +/** + * Internal poolifier pool emitter. + */ class PoolEmitter extends EventEmitter {} +/** + * Base class containing some shared logic for all poolifier pools. + * + * @template Worker Type of worker which manages this pool. + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. + */ export abstract class AbstractPool< Worker extends IWorker, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - Data = any, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - Response = any + Data = unknown, + Response = unknown > implements IPool { + /** + * List of currently available workers. + */ public readonly workers: Worker[] = [] + + /** + * ID for the next worker. + */ public nextWorker: number = 0 /** - * `workerId` as key and an integer value + * - `key`: The `Worker` + * - `value`: Number of tasks that has been assigned to that worker since it started */ public readonly tasks: Map = new Map() + /** + * Emitter on which events can be listened to. + * + * Events that can currently be listened to: + * + * - `'FullPool'` + */ public readonly emitter: PoolEmitter + /** + * ID of the next message. + */ protected id: number = 0 + /** + * Constructs a new poolifier pool. + * + * @param numWorkers Number of workers that this pool should manage. + * @param filePath Path to the worker-file. + * @param opts Options for the pool. Default: `{ maxTasks: 1000 }` + */ public constructor ( public readonly numWorkers: number, public readonly filePath: string, @@ -76,10 +129,17 @@ export abstract class AbstractPool< this.emitter = new PoolEmitter() } + /** + * Setup hook that can be overridden by a Poolifer pool implementation + * to run code before workers are created in the abstract constructor. + */ protected setupHook (): void { // Can be overridden } + /** + * Should return whether the worker is the main worker or not. + */ protected abstract isMain (): boolean public async destroy (): Promise { @@ -88,13 +148,29 @@ export abstract class AbstractPool< } } + /** + * Shut down given worker. + * + * @param worker A worker within `workers`. + */ protected abstract destroyWorker (worker: Worker): void | Promise + /** + * Send a message to the given worker. + * + * @param worker The worker which should receive the message. + * @param message The message. + */ protected abstract sendToWorker ( worker: Worker, message: MessageValue ): void + /** + * Adds the given worker to the pool. + * + * @param worker Worker that will be added. + */ protected addWorker (worker: Worker): void { const previousWorkerIndex = this.tasks.get(worker) if (previousWorkerIndex !== undefined) { @@ -105,13 +181,19 @@ export abstract class AbstractPool< } /** - * Execute the task specified into the constructor with the data parameter. + * Removes the given worker from the pool. * - * @param data The input for the task specified. - * @returns Promise that is resolved when the task is done. + * @param worker Worker that will be removed. */ + protected removeWorker (worker: Worker): void { + // Clean worker from data structure + const workerIndex = this.workers.indexOf(worker) + this.workers.splice(workerIndex, 1) + this.tasks.delete(worker) + } + public execute (data: Data): Promise { - // configure worker to handle message with the specified task + // Configure worker to handle message with the specified task const worker = this.chooseWorker() this.addWorker(worker) const id = ++this.id @@ -144,20 +226,38 @@ export abstract class AbstractPool< }) } + /** + * Choose a worker for the next task. + * + * The default implementation uses a round robin algorithm to distribute the load. + * + * @returns Worker. + */ protected chooseWorker (): Worker { - if (this.workers.length - 1 === this.nextWorker) { - this.nextWorker = 0 - return this.workers[this.nextWorker] - } else { - this.nextWorker++ - return this.workers[this.nextWorker] - } + this.nextWorker = + this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1 + return this.workers[this.nextWorker] } + /** + * Returns a newly created worker. + */ protected abstract newWorker (): Worker + /** + * Function that can be hooked up when a worker has been newly created and moved to the workers registry. + * + * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default. + * + * @param worker The newly created worker. + */ protected abstract afterNewWorkerPushed (worker: Worker): void + /** + * Creates a new worker for this pool and sets it up completely. + * + * @returns New, completely set up worker. + */ protected internalNewWorker (): Worker { const worker: Worker = this.newWorker() worker.on('error', this.opts.errorHandler ?? (() => {}))