X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=0435a32c47067442af13cde6773a7831754e516f;hb=deb85c12b77faf6974551cefcd9676e62a392086;hp=0685f998d4aaf11d0cb93d86fcdb24b0e3476fbc;hpb=8780c880a2cf08680092f6e41fb82600b196455c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0685f998..0435a32c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,13 @@ import EventEmitter from 'events' import type { MessageValue } from '../utility-types' import type { IPool } from './pool' +/** + * An intentional empty function. + */ +function emptyFunction () { + // intentionally left blank +} + /** * Callback invoked if the worker raised an error. */ @@ -21,9 +28,33 @@ export type ExitHandler = (this: Worker, code: number) => void * Basic interface that describes the minimum required implementation of listener events for a pool-worker. */ export interface IWorker { + /** + * Register a listener to the error event. + * + * @param event `'error'`. + * @param handler The error handler. + */ on(event: 'error', handler: ErrorHandler): void + /** + * Register a listener to the online event. + * + * @param event `'online'`. + * @param handler The online handler. + */ on(event: 'online', handler: OnlineHandler): void + /** + * Register a listener to the exit event. + * + * @param event `'exit'`. + * @param handler The exit handler. + */ on(event: 'exit', handler: ExitHandler): void + /** + * Register a listener to the exit event that will only performed once. + * + * @param event `'exit'`. + * @param handler The exit handler. + */ once(event: 'exit', handler: ExitHandler): void } @@ -63,8 +94,8 @@ class PoolEmitter extends EventEmitter {} * Base class containing some shared logic for all poolifier pools. * * @template Worker Type of worker which manages this pool. - * @template Data Type of data sent to the worker. - * @template Response Type of response of execution. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. */ export abstract class AbstractPool< Worker extends IWorker, @@ -82,6 +113,8 @@ export abstract class AbstractPool< public nextWorkerIndex: number = 0 /** + * The tasks map. + * * - `key`: The `Worker` * - `value`: Number of tasks currently in progress on the worker. */ @@ -116,10 +149,7 @@ export abstract class AbstractPool< if (!this.isMain()) { throw new Error('Cannot start a pool from a worker!') } - // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check - if (!this.filePath) { - throw new Error('Please specify a file with a worker implementation') - } + this.checkFilePath(this.filePath) this.setupHook() for (let i = 1; i <= this.numberOfWorkers; i++) { @@ -129,27 +159,18 @@ export abstract class AbstractPool< this.emitter = new PoolEmitter() } - /** - * Number of workers that this pool should manage. - * - * @returns Number of workers that this pool manages. - * @deprecated Only here for backward compatibility. - */ - // eslint-disable-next-line spellcheck/spell-checker - public get numWorkers (): number { - return this.numberOfWorkers + private checkFilePath (filePath: string) { + if (!filePath) { + throw new Error('Please specify a file with a worker implementation') + } } /** - * Index for the next worker. + * Perform the task specified in the constructor with the data parameter. * - * @returns Index for the next worker. - * @deprecated Only here for backward compatibility. + * @param data The input for the specified task. This can only be serializable data. + * @returns Promise that will be resolved when the task is successfully completed. */ - public get nextWorker (): number { - return this.nextWorkerIndex - } - public execute (data: Data): Promise { // Configure worker to handle message with the specified task const worker = this.chooseWorker() @@ -160,6 +181,9 @@ export abstract class AbstractPool< return res } + /** + * Shut down every current worker in this pool. + */ public async destroy (): Promise { await Promise.all(this.workers.map(worker => this.destroyWorker(worker))) } @@ -187,26 +211,31 @@ export abstract class AbstractPool< /** * Increase the number of tasks that the given workers has done. * - * @param worker Workers whose tasks are increased. + * @param worker Worker whose tasks are increased. */ protected increaseWorkersTask (worker: Worker): void { - const numberOfTasksInProgress = this.tasks.get(worker) - if (numberOfTasksInProgress !== undefined) { - this.tasks.set(worker, numberOfTasksInProgress + 1) - } else { - throw Error('Worker could not be found in tasks map') - } + this.stepWorkerNumberOfTasks(worker, 1) } /** * Decrease the number of tasks that the given workers has done. * - * @param worker Workers whose tasks are decreased. + * @param worker Worker whose tasks are decreased. */ protected decreaseWorkersTasks (worker: Worker): void { + this.stepWorkerNumberOfTasks(worker, -1) + } + + /** + * Step the number of tasks that the given workers has done. + * + * @param worker Worker whose tasks are set. + * @param step Worker number of tasks step. + */ + private stepWorkerNumberOfTasks (worker: Worker, step: number) { const numberOfTasksInProgress = this.tasks.get(worker) if (numberOfTasksInProgress !== undefined) { - this.tasks.set(worker, numberOfTasksInProgress - 1) + this.tasks.set(worker, numberOfTasksInProgress + step) } else { throw Error('Worker could not be found in tasks map') } @@ -298,9 +327,9 @@ export abstract class AbstractPool< protected createAndSetupWorker (): Worker { const worker: Worker = this.createWorker() - worker.on('error', this.opts.errorHandler ?? (() => {})) - worker.on('online', this.opts.onlineHandler ?? (() => {})) - worker.on('exit', this.opts.exitHandler ?? (() => {})) + worker.on('error', this.opts.errorHandler ?? emptyFunction) + worker.on('online', this.opts.onlineHandler ?? emptyFunction) + worker.on('exit', this.opts.exitHandler ?? emptyFunction) worker.once('exit', () => this.removeWorker(worker)) this.workers.push(worker)