X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=ac3bd6cc7b02f0f7e592de747a5668a25d845f09;hb=838898f1dfd2d2456d3e9a832d24a7ef6f53be70;hp=b3198af3750252a738dca6b08d85da91ee94c599;hpb=5c5a1fb79906f18560012263239e7bd611ee8d2b;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b3198af3..ac3bd6cc 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -24,6 +24,7 @@ export interface IWorker { on(event: 'error', handler: ErrorHandler): void on(event: 'online', handler: OnlineHandler): void on(event: 'exit', handler: ExitHandler): void + once(event: 'exit', handler: ExitHandler): void } /** @@ -76,9 +77,9 @@ export abstract class AbstractPool< public readonly workers: Worker[] = [] /** - * ID for the next worker. + * Index for the next worker. */ - public nextWorker: number = 0 + public nextWorkerIndex: number = 0 /** * - `key`: The `Worker` @@ -98,7 +99,7 @@ export abstract class AbstractPool< /** * ID of the next message. */ - protected id: number = 0 + protected nextMessageId: number = 0 /** * Constructs a new poolifier pool. @@ -123,7 +124,7 @@ export abstract class AbstractPool< this.setupHook() for (let i = 1; i <= this.numberOfWorkers; i++) { - this.internalNewWorker() + this.createAndSetupWorker() } this.emitter = new PoolEmitter() @@ -141,22 +142,27 @@ export abstract class AbstractPool< } /** - * Setup hook that can be overridden by a Poolifier pool implementation - * to run code before workers are created in the abstract constructor. + * Index for the next worker. + * + * @returns Index for the next worker. + * @deprecated Only here for backward compatibility. */ - protected setupHook (): void { - // Can be overridden + public get nextWorker (): number { + return this.nextWorkerIndex } - /** - * Should return whether the worker is the main worker or not. - */ - protected abstract isMain (): boolean + public execute (data: Data): Promise { + // Configure worker to handle message with the specified task + const worker = this.chooseWorker() + this.increaseWorkersTask(worker) + const messageId = ++this.nextMessageId + const res = this.internalExecute(worker, messageId) + this.sendToWorker(worker, { data: data || ({} as Data), id: messageId }) + return res + } public async destroy (): Promise { - for (const worker of this.workers) { - await this.destroyWorker(worker) - } + await Promise.all(this.workers.map(worker => this.destroyWorker(worker))) } /** @@ -167,25 +173,27 @@ export abstract class AbstractPool< protected abstract destroyWorker (worker: Worker): void | Promise /** - * Send a message to the given worker. - * - * @param worker The worker which should receive the message. - * @param message The message. + * Setup hook that can be overridden by a Poolifier pool implementation + * to run code before workers are created in the abstract constructor. */ - protected abstract sendToWorker ( - worker: Worker, - message: MessageValue - ): void + protected setupHook (): void { + // Can be overridden + } /** - * Adds the given worker to the pool. + * Should return whether the worker is the main worker or not. + */ + protected abstract isMain (): boolean + + /** + * Increase the number of tasks that the given workers has done. * - * @param worker Worker that will be added. + * @param worker Workers whose tasks are increased. */ - protected addWorker (worker: Worker): void { - const previousWorkerIndex = this.tasks.get(worker) - if (previousWorkerIndex !== undefined) { - this.tasks.set(worker, previousWorkerIndex + 1) + protected increaseWorkersTask (worker: Worker): void { + const numberOfTasksTheWorkerHas = this.tasks.get(worker) + if (numberOfTasksTheWorkerHas !== undefined) { + this.tasks.set(worker, numberOfTasksTheWorkerHas + 1) } else { throw Error('Worker could not be found in tasks map') } @@ -203,16 +211,33 @@ export abstract class AbstractPool< this.tasks.delete(worker) } - public execute (data: Data): Promise { - // Configure worker to handle message with the specified task - const worker = this.chooseWorker() - this.addWorker(worker) - const id = ++this.id - const res = this.internalExecute(worker, id) - this.sendToWorker(worker, { data: data || ({} as Data), id: id }) - return res + /** + * Choose a worker for the next task. + * + * The default implementation uses a round robin algorithm to distribute the load. + * + * @returns Worker. + */ + protected chooseWorker (): Worker { + const chosenWorker = this.workers[this.nextWorkerIndex] + this.nextWorkerIndex = + this.workers.length - 1 === this.nextWorkerIndex + ? 0 + : this.nextWorkerIndex + 1 + return chosenWorker } + /** + * Send a message to the given worker. + * + * @param worker The worker which should receive the message. + * @param message The message. + */ + protected abstract sendToWorker ( + worker: Worker, + message: MessageValue + ): void + protected abstract registerWorkerMessageListener ( port: Worker, listener: (message: MessageValue) => void @@ -223,12 +248,15 @@ export abstract class AbstractPool< listener: (message: MessageValue) => void ): void - protected internalExecute (worker: Worker, id: number): Promise { + protected internalExecute ( + worker: Worker, + messageId: number + ): Promise { return new Promise((resolve, reject) => { const listener: (message: MessageValue) => void = message => { - if (message.id === id) { + if (message.id === messageId) { this.unregisterWorkerMessageListener(worker, listener) - this.addWorker(worker) + this.increaseWorkersTask(worker) if (message.error) reject(message.error) else resolve(message.data as Response) } @@ -237,23 +265,10 @@ export abstract class AbstractPool< }) } - /** - * Choose a worker for the next task. - * - * The default implementation uses a round robin algorithm to distribute the load. - * - * @returns Worker. - */ - protected chooseWorker (): Worker { - this.nextWorker = - this.nextWorker === this.workers.length - 1 ? 0 : this.nextWorker + 1 - return this.workers[this.nextWorker] - } - /** * Returns a newly created worker. */ - protected abstract newWorker (): Worker + protected abstract createWorker (): Worker /** * Function that can be hooked up when a worker has been newly created and moved to the workers registry. @@ -262,23 +277,28 @@ export abstract class AbstractPool< * * @param worker The newly created worker. */ - protected abstract afterNewWorkerPushed (worker: Worker): void + protected abstract afterWorkerSetup (worker: Worker): void /** * Creates a new worker for this pool and sets it up completely. * * @returns New, completely set up worker. */ - protected internalNewWorker (): Worker { - const worker: Worker = this.newWorker() + protected createAndSetupWorker (): Worker { + const worker: Worker = this.createWorker() + worker.on('error', this.opts.errorHandler ?? (() => {})) worker.on('online', this.opts.onlineHandler ?? (() => {})) - // TODO handle properly when a worker exit worker.on('exit', this.opts.exitHandler ?? (() => {})) + worker.once('exit', () => this.removeWorker(worker)) + this.workers.push(worker) - this.afterNewWorkerPushed(worker) - // init tasks map + + // Init tasks map this.tasks.set(worker, 0) + + this.afterWorkerSetup(worker) + return worker } }