X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f6ed280cc3bbcf39b6a4aed1faa37603bf17786d;hb=45dbbb14328a173cad05ddcf21b5acf7f6460bb8;hp=d169afadb1df863cfbf8f37f1dbd623b211f3b07;hpb=cda5cc74c77bdfc37b220ef19637876e221b5061;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d169afad..f6ed280c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,16 +2,34 @@ import EventEmitter from 'events' import type { MessageValue } from '../utility-types' import type { IPool } from './pool' +/** + * Callback invoked if the worker raised an error. + */ export type ErrorHandler = (this: Worker, e: Error) => void + +/** + * Callback invoked when the worker has started successfully. + */ export type OnlineHandler = (this: Worker) => void + +/** + * Callback invoked when the worker exits successfully. + */ export type ExitHandler = (this: Worker, code: number) => void +/** + * Basic interface that describes the minimum required implementation of listener events for a pool-worker. + */ export interface IWorker { on(event: 'error', handler: ErrorHandler): void on(event: 'online', handler: OnlineHandler): void on(event: 'exit', handler: ExitHandler): void + once(event: 'exit', handler: ExitHandler): void } +/** + * Options for a poolifier pool. + */ export interface PoolOptions { /** * A function that will listen for error event on each worker. @@ -26,36 +44,72 @@ export interface PoolOptions { */ exitHandler?: ExitHandler /** - * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). + * This is just to avoid non-useful warning messages. + * + * Will be used to set `maxListeners` on event emitters (workers are event emitters). * * @default 1000 + * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n) */ maxTasks?: number } +/** + * Internal poolifier pool emitter. + */ class PoolEmitter extends EventEmitter {} +/** + * Base class containing some shared logic for all poolifier pools. + * + * @template Worker Type of worker which manages this pool. + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. + */ export abstract class AbstractPool< Worker extends IWorker, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - Data = any, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - Response = any + Data = unknown, + Response = unknown > implements IPool { + /** + * List of currently available workers. + */ public readonly workers: Worker[] = [] - public nextWorker: number = 0 /** - * `workerId` as key and an integer value + * Index for the next worker. + */ + public nextWorkerIndex: number = 0 + + /** + * - `key`: The `Worker` + * - `value`: Number of tasks that has been assigned to that worker since it started */ public readonly tasks: Map = new Map() + /** + * Emitter on which events can be listened to. + * + * Events that can currently be listened to: + * + * - `'FullPool'` + */ public readonly emitter: PoolEmitter - protected id: number = 0 + /** + * ID of the next message. + */ + protected nextMessageId: number = 0 + /** + * Constructs a new poolifier pool. + * + * @param numberOfWorkers Number of workers that this pool should manage. + * @param filePath Path to the worker-file. + * @param opts Options for the pool. Default: `{ maxTasks: 1000 }` + */ public constructor ( - public readonly numWorkers: number, + public readonly numberOfWorkers: number, public readonly filePath: string, public readonly opts: PoolOptions = { maxTasks: 1000 } ) { @@ -69,57 +123,119 @@ export abstract class AbstractPool< this.setupHook() - for (let i = 1; i <= this.numWorkers; i++) { - this.internalNewWorker() + for (let i = 1; i <= this.numberOfWorkers; i++) { + this.createAndSetupWorker() } this.emitter = new PoolEmitter() } - protected setupHook (): void { - // Can be overridden + /** + * Number of workers that this pool should manage. + * + * @returns Number of workers that this pool manages. + * @deprecated Only here for backward compatibility. + */ + // eslint-disable-next-line spellcheck/spell-checker + public get numWorkers (): number { + return this.numberOfWorkers } - protected abstract isMain (): boolean + /** + * Index for the next worker. + * + * @returns Index for the next worker. + * @deprecated Only here for backward compatibility. + */ + public get nextWorker (): number { + return this.nextWorkerIndex + } + + public execute (data: Data): Promise { + // Configure worker to handle message with the specified task + const worker = this.chooseWorker() + this.increaseWorkersTask(worker) + const messageId = ++this.nextMessageId + const res = this.internalExecute(worker, messageId) + this.sendToWorker(worker, { data: data || ({} as Data), id: messageId }) + return res + } public async destroy (): Promise { - for (const worker of this.workers) { - await this.destroyWorker(worker) - } + await Promise.all(this.workers.map(worker => this.destroyWorker(worker))) } + /** + * Shut down given worker. + * + * @param worker A worker within `workers`. + */ protected abstract destroyWorker (worker: Worker): void | Promise - protected abstract sendToWorker ( - worker: Worker, - message: MessageValue - ): void + /** + * Setup hook that can be overridden by a Poolifier pool implementation + * to run code before workers are created in the abstract constructor. + */ + protected setupHook (): void { + // Can be overridden + } + + /** + * Should return whether the worker is the main worker or not. + */ + protected abstract isMain (): boolean - protected addWorker (worker: Worker): void { - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) + /** + * Increase the number of tasks that the given workers has done. + * + * @param worker Workers whose tasks are increased. + */ + protected increaseWorkersTask (worker: Worker): void { + const numberOfTasksTheWorkerHas = this.tasks.get(worker) + if (numberOfTasksTheWorkerHas !== undefined) { + this.tasks.set(worker, numberOfTasksTheWorkerHas + 1) } else { throw Error('Worker could not be found in tasks map') } } /** - * Execute the task specified into the constructor with the data parameter. + * Removes the given worker from the pool. * - * @param data The input for the task specified. - * @returns Promise that is resolved when the task is done. + * @param worker Worker that will be removed. */ - public execute (data: Data): Promise { - // configure worker to handle message with the specified task - const worker = this.chooseWorker() - this.addWorker(worker) - const id = ++this.id - const res = this.internalExecute(worker, id) - this.sendToWorker(worker, { data: data || ({} as Data), id: id }) - return res + protected removeWorker (worker: Worker): void { + // Clean worker from data structure + const workerIndex = this.workers.indexOf(worker) + this.workers.splice(workerIndex, 1) + this.tasks.delete(worker) } + /** + * Choose a worker for the next task. + * + * The default implementation uses a round robin algorithm to distribute the load. + * + * @returns Worker. + */ + protected chooseWorker (): Worker { + const chosenWorker = this.workers[this.nextWorkerIndex] + this.nextWorkerIndex++ + this.nextWorkerIndex %= this.workers.length + return chosenWorker + } + + /** + * Send a message to the given worker. + * + * @param worker The worker which should receive the message. + * @param message The message. + */ + protected abstract sendToWorker ( + worker: Worker, + message: MessageValue + ): void + protected abstract registerWorkerMessageListener ( port: Worker, listener: (message: MessageValue) => void @@ -130,12 +246,15 @@ export abstract class AbstractPool< listener: (message: MessageValue) => void ): void - protected internalExecute (worker: Worker, id: number): Promise { + protected internalExecute ( + worker: Worker, + messageId: number + ): Promise { return new Promise((resolve, reject) => { const listener: (message: MessageValue) => void = message => { - if (message.id === id) { + if (message.id === messageId) { this.unregisterWorkerMessageListener(worker, listener) - this.addWorker(worker) + this.increaseWorkersTask(worker) if (message.error) reject(message.error) else resolve(message.data as Response) } @@ -144,30 +263,40 @@ export abstract class AbstractPool< }) } - protected chooseWorker (): Worker { - if (this.workers.length - 1 === this.nextWorker) { - this.nextWorker = 0 - return this.workers[this.nextWorker] - } else { - this.nextWorker++ - return this.workers[this.nextWorker] - } - } + /** + * Returns a newly created worker. + */ + protected abstract createWorker (): Worker - protected abstract newWorker (): Worker + /** + * Function that can be hooked up when a worker has been newly created and moved to the workers registry. + * + * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default. + * + * @param worker The newly created worker. + */ + protected abstract afterWorkerSetup (worker: Worker): void - protected abstract afterNewWorkerPushed (worker: Worker): void + /** + * Creates a new worker for this pool and sets it up completely. + * + * @returns New, completely set up worker. + */ + protected createAndSetupWorker (): Worker { + const worker: Worker = this.createWorker() - protected internalNewWorker (): Worker { - const worker: Worker = this.newWorker() worker.on('error', this.opts.errorHandler ?? (() => {})) worker.on('online', this.opts.onlineHandler ?? (() => {})) - // TODO handle properly when a worker exit worker.on('exit', this.opts.exitHandler ?? (() => {})) + worker.once('exit', () => this.removeWorker(worker)) + this.workers.push(worker) - this.afterNewWorkerPushed(worker) - // init tasks map + + // Init tasks map this.tasks.set(worker, 0) + + this.afterWorkerSetup(worker) + return worker } }