X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c951c6f23792ea631b8804e11c7ff25719ce5fa5;hb=17a9a09416d6c07f560eb0c79bd031d8c0125d4e;hp=6e4465edd7a262af497cf8e9b84942ca9b80d4f6;hpb=280c2a7728fbeb53612d8bc115a295d0255dd991;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 6e4465ed..c951c6f2 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,13 @@ import EventEmitter from 'events' import type { MessageValue } from '../utility-types' import type { IPool } from './pool' +/** + * An intentional empty function. + */ +function emptyFunction () { + // intentionally left blank +} + /** * Callback invoked if the worker raised an error. */ @@ -21,9 +28,34 @@ export type ExitHandler = (this: Worker, code: number) => void * Basic interface that describes the minimum required implementation of listener events for a pool-worker. */ export interface IWorker { + /** + * Register a listener to the error event. + * + * @param event `'error'`. + * @param handler The error handler. + */ on(event: 'error', handler: ErrorHandler): void + /** + * Register a listener to the online event. + * + * @param event `'online'`. + * @param handler The online handler. + */ on(event: 'online', handler: OnlineHandler): void + /** + * Register a listener to the exit event. + * + * @param event `'exit'`. + * @param handler The exit handler. + */ on(event: 'exit', handler: ExitHandler): void + /** + * Register a listener to the exit event that will only performed once. + * + * @param event `'exit'`. + * @param handler The exit handler. + */ + once(event: 'exit', handler: ExitHandler): void } /** @@ -81,8 +113,10 @@ export abstract class AbstractPool< public nextWorkerIndex: number = 0 /** + * The tasks map. + * * - `key`: The `Worker` - * - `value`: Number of tasks that has been assigned to that worker since it started + * - `value`: Number of tasks currently in progress on the worker. */ public readonly tasks: Map = new Map() @@ -115,11 +149,7 @@ export abstract class AbstractPool< if (!this.isMain()) { throw new Error('Cannot start a pool from a worker!') } - // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check - if (!this.filePath) { - throw new Error('Please specify a file with a worker implementation') - } - + this.checkFilePath(this.filePath) this.setupHook() for (let i = 1; i <= this.numberOfWorkers; i++) { @@ -129,27 +159,18 @@ export abstract class AbstractPool< this.emitter = new PoolEmitter() } - /** - * Number of workers that this pool should manage. - * - * @returns Number of workers that this pool manages. - * @deprecated Only here for backward compatibility. - */ - // eslint-disable-next-line spellcheck/spell-checker - public get numWorkers (): number { - return this.numberOfWorkers + private checkFilePath (filePath: string) { + if (!filePath) { + throw new Error('Please specify a file with a worker implementation') + } } /** - * Index for the next worker. + * Perform the task specified in the constructor with the data parameter. * - * @returns Index for the next worker. - * @deprecated Only here for backward compatibility. + * @param data The input for the specified task. + * @returns Promise that will be resolved when the task is successfully completed. */ - public get nextWorker (): number { - return this.nextWorkerIndex - } - public execute (data: Data): Promise { // Configure worker to handle message with the specified task const worker = this.chooseWorker() @@ -160,10 +181,11 @@ export abstract class AbstractPool< return res } + /** + * Shut down every current worker in this pool. + */ public async destroy (): Promise { - for (const worker of this.workers) { - await this.destroyWorker(worker) - } + await Promise.all(this.workers.map(worker => this.destroyWorker(worker))) } /** @@ -189,12 +211,31 @@ export abstract class AbstractPool< /** * Increase the number of tasks that the given workers has done. * - * @param worker Workers whose tasks are increased. + * @param worker Worker whose tasks are increased. */ protected increaseWorkersTask (worker: Worker): void { - const numberOfTasksTheWorkerHas = this.tasks.get(worker) - if (numberOfTasksTheWorkerHas !== undefined) { - this.tasks.set(worker, numberOfTasksTheWorkerHas + 1) + this.stepWorkerNumberOfTasks(worker, 1) + } + + /** + * Decrease the number of tasks that the given workers has done. + * + * @param worker Worker whose tasks are decreased. + */ + protected decreaseWorkersTasks (worker: Worker): void { + this.stepWorkerNumberOfTasks(worker, -1) + } + + /** + * Step the number of tasks that the given workers has done. + * + * @param worker Worker whose tasks are set. + * @param step Worker number of tasks step. + */ + private stepWorkerNumberOfTasks (worker: Worker, step: number) { + const numberOfTasksInProgress = this.tasks.get(worker) + if (numberOfTasksInProgress !== undefined) { + this.tasks.set(worker, numberOfTasksInProgress + step) } else { throw Error('Worker could not be found in tasks map') } @@ -221,8 +262,10 @@ export abstract class AbstractPool< */ protected chooseWorker (): Worker { const chosenWorker = this.workers[this.nextWorkerIndex] - this.nextWorkerIndex++ - this.nextWorkerIndex %= this.workers.length + this.nextWorkerIndex = + this.workers.length - 1 === this.nextWorkerIndex + ? 0 + : this.nextWorkerIndex + 1 return chosenWorker } @@ -237,15 +280,13 @@ export abstract class AbstractPool< message: MessageValue ): void - protected abstract registerWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void - ): void + protected abstract registerWorkerMessageListener< + Message extends Data | Response + > (worker: Worker, listener: (message: MessageValue) => void): void - protected abstract unregisterWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void - ): void + protected abstract unregisterWorkerMessageListener< + Message extends Data | Response + > (worker: Worker, listener: (message: MessageValue) => void): void protected internalExecute ( worker: Worker, @@ -255,7 +296,7 @@ export abstract class AbstractPool< const listener: (message: MessageValue) => void = message => { if (message.id === messageId) { this.unregisterWorkerMessageListener(worker, listener) - this.increaseWorkersTask(worker) + this.decreaseWorkersTasks(worker) if (message.error) reject(message.error) else resolve(message.data as Response) } @@ -286,10 +327,10 @@ export abstract class AbstractPool< protected createAndSetupWorker (): Worker { const worker: Worker = this.createWorker() - worker.on('error', this.opts.errorHandler ?? (() => {})) - worker.on('online', this.opts.onlineHandler ?? (() => {})) - // TODO handle properly when a worker exit - worker.on('exit', this.opts.exitHandler ?? (() => {})) + worker.on('error', this.opts.errorHandler ?? emptyFunction) + worker.on('online', this.opts.onlineHandler ?? emptyFunction) + worker.on('exit', this.opts.exitHandler ?? emptyFunction) + worker.once('exit', () => this.removeWorker(worker)) this.workers.push(worker)