X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=ad21effb3bbb9837977a54c2f8c159c85af6b3c8;hb=b1989cfdf9af9f2c5e01b9a3f7c81b8c3ed78cb4;hp=c1e6ddc09a9a41ce18d34ced63a162225f66818f;hpb=3300e7bcb155358c2cc1eed6e4ac11581457616f;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c1e6ddc0..ad21effb 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -45,9 +45,9 @@ export abstract class AbstractPool< > = new Map>() /** - * Worker choice strategy instance implementing the worker choice algorithm. + * Worker choice strategy context referencing a worker choice algorithm implementation. * - * Default to a strategy implementing a round robin algorithm. + * Default to a round robin algorithm. */ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< Worker, @@ -76,6 +76,7 @@ export abstract class AbstractPool< this.chooseWorker.bind(this) this.internalExecute.bind(this) + this.checkAndEmitFull.bind(this) this.checkAndEmitBusy.bind(this) this.sendToWorker.bind(this) @@ -141,6 +142,15 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN + if ( + !Object.values(WorkerChoiceStrategies).includes( + this.opts.workerChoiceStrategy + ) + ) { + throw new Error( + `Invalid worker choice strategy '${this.opts.workerChoiceStrategy}'` + ) + } this.opts.enableEvents = opts.enableEvents ?? true } @@ -148,7 +158,7 @@ export abstract class AbstractPool< public abstract get type (): PoolType /** - * Number of tasks concurrently running. + * Number of tasks concurrently running in the pool. */ private get numberOfRunningTasks (): number { return this.promiseResponseMap.size @@ -179,7 +189,6 @@ export abstract class AbstractPool< }) } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( - this, workerChoiceStrategy ) } @@ -209,6 +218,7 @@ export abstract class AbstractPool< const [workerKey, worker] = this.chooseWorker() const messageId = crypto.randomUUID() const res = this.internalExecute(workerKey, worker, messageId) + this.checkAndEmitFull() this.checkAndEmitBusy() this.sendToWorker(worker, { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -287,21 +297,10 @@ export abstract class AbstractPool< } } - /** - * Removes the given worker from the pool. - * - * @param worker - The worker that will be removed. - */ - protected removeWorker (worker: Worker): void { - const workerKey = this.getWorkerKey(worker) - this.workers.splice(workerKey, 1) - this.workerChoiceStrategyContext.remove(workerKey) - } - /** * Chooses a worker for the next task. * - * The default implementation uses a round robin algorithm to distribute the load. + * The default uses a round robin algorithm to distribute the load. * * @returns [worker key, worker]. */ @@ -381,9 +380,9 @@ export abstract class AbstractPool< */ protected workerListener (): (message: MessageValue) => void { return message => { - if (message.id !== undefined) { + if (message.id != null) { const promiseResponse = this.promiseResponseMap.get(message.id) - if (promiseResponse !== undefined) { + if (promiseResponse != null) { if (message.error != null) { promiseResponse.reject(message.error) } else { @@ -413,6 +412,16 @@ export abstract class AbstractPool< } } + private checkAndEmitFull (): void { + if ( + this.type === PoolType.DYNAMIC && + this.opts.enableEvents === true && + this.full + ) { + this.emitter?.emit('full') + } + } + /** * Gets worker tasks usage. * @@ -428,7 +437,7 @@ export abstract class AbstractPool< } /** - * Pushes the given worker. + * Pushes the given worker in the pool. * * @param worker - The worker. * @param tasksUsage - The worker tasks usage. @@ -441,7 +450,7 @@ export abstract class AbstractPool< } /** - * Sets the given worker. + * Sets the given worker in the pool. * * @param workerKey - The worker key. * @param worker - The worker. @@ -457,4 +466,15 @@ export abstract class AbstractPool< tasksUsage } } + + /** + * Removes the given worker from the pool. + * + * @param worker - The worker that will be removed. + */ + protected removeWorker (worker: Worker): void { + const workerKey = this.getWorkerKey(worker) + this.workers.splice(workerKey, 1) + this.workerChoiceStrategyContext.remove(workerKey) + } }