X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=879e3e4f78f2b3ab60a5918a865fa7c711e3d882;hb=e5177d860cd10dd89b107cc6692e9989286b3a7c;hp=f6ed280cc3bbcf39b6a4aed1faa37603bf17786d;hpb=45dbbb14328a173cad05ddcf21b5acf7f6460bb8;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f6ed280c..879e3e4f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -83,7 +83,7 @@ export abstract class AbstractPool< /** * - `key`: The `Worker` - * - `value`: Number of tasks that has been assigned to that worker since it started + * - `value`: Number of tasks currently in progress on the worker. */ public readonly tasks: Map = new Map() @@ -120,7 +120,6 @@ export abstract class AbstractPool< if (!this.filePath) { throw new Error('Please specify a file with a worker implementation') } - this.setupHook() for (let i = 1; i <= this.numberOfWorkers; i++) { @@ -199,6 +198,20 @@ export abstract class AbstractPool< } } + /** + * Increase the number of tasks that the given workers has done. + * + * @param worker Workers whose tasks are increased. + */ + protected decreaseWorkersTasks (worker: Worker): void { + const numberOfTasksTheWorkerHas = this.tasks.get(worker) + if (numberOfTasksTheWorkerHas !== undefined) { + this.tasks.set(worker, numberOfTasksTheWorkerHas - 1) + } else { + throw Error('Worker could not be found in tasks map') + } + } + /** * Removes the given worker from the pool. * @@ -220,8 +233,10 @@ export abstract class AbstractPool< */ protected chooseWorker (): Worker { const chosenWorker = this.workers[this.nextWorkerIndex] - this.nextWorkerIndex++ - this.nextWorkerIndex %= this.workers.length + this.nextWorkerIndex = + this.workers.length - 1 === this.nextWorkerIndex + ? 0 + : this.nextWorkerIndex + 1 return chosenWorker } @@ -236,15 +251,13 @@ export abstract class AbstractPool< message: MessageValue ): void - protected abstract registerWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void - ): void + protected abstract registerWorkerMessageListener< + Message extends Data | Response + > (worker: Worker, listener: (message: MessageValue) => void): void - protected abstract unregisterWorkerMessageListener ( - port: Worker, - listener: (message: MessageValue) => void - ): void + protected abstract unregisterWorkerMessageListener< + Message extends Data | Response + > (worker: Worker, listener: (message: MessageValue) => void): void protected internalExecute ( worker: Worker, @@ -254,7 +267,7 @@ export abstract class AbstractPool< const listener: (message: MessageValue) => void = message => { if (message.id === messageId) { this.unregisterWorkerMessageListener(worker, listener) - this.increaseWorkersTask(worker) + this.decreaseWorkersTasks(worker) if (message.error) reject(message.error) else resolve(message.data as Response) }