X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=9ba4fbf9c6dcc6d4d152891a3c19ad0fa01715a4;hb=e2473f60d4484d8ba10f972b7099550cd61c1730;hp=9f7813e74cd7bd2579081ef9fd83facef0c59fdf;hpb=027063571693f211b35c8851566a063201adb9af;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9f7813e7..9ba4fbf9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -17,7 +17,8 @@ import { PoolEmitter } from './pool' import type { IWorker, Task, TasksUsage, WorkerNode } from './worker' import { WorkerChoiceStrategies, - type WorkerChoiceStrategy + type WorkerChoiceStrategy, + type WorkerChoiceStrategyOptions } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' import { CircularArray } from '../circular-array' @@ -144,16 +145,12 @@ export abstract class AbstractPool< this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - if ((opts.tasksQueueOptions?.concurrency as number) <= 0) { - throw new Error( - `Invalid worker tasks concurrency '${ - (opts.tasksQueueOptions as TasksQueueOptions).concurrency as number - }'` - ) - } - this.opts.tasksQueueOptions = { - concurrency: opts.tasksQueueOptions?.concurrency ?? 1 - } + this.checkValidTasksQueueOptions( + opts.tasksQueueOptions as TasksQueueOptions + ) + this.opts.tasksQueueOptions = this.buildTasksQueueOptions( + opts.tasksQueueOptions as TasksQueueOptions + ) } } @@ -167,6 +164,18 @@ export abstract class AbstractPool< } } + private checkValidTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions + ): void { + if ((tasksQueueOptions?.concurrency as number) <= 0) { + throw new Error( + `Invalid worker tasks concurrency '${ + tasksQueueOptions.concurrency as number + }'` + ) + } + } + /** @inheritDoc */ public abstract get type (): PoolType @@ -207,7 +216,8 @@ export abstract class AbstractPool< /** @inheritDoc */ public setWorkerChoiceStrategy ( - workerChoiceStrategy: WorkerChoiceStrategy + workerChoiceStrategy: WorkerChoiceStrategy, + workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy @@ -223,10 +233,52 @@ export abstract class AbstractPool< }) } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( - workerChoiceStrategy + this.opts.workerChoiceStrategy + ) + if (workerChoiceStrategyOptions != null) { + this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + } + } + + /** @inheritDoc */ + public setWorkerChoiceStrategyOptions ( + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + ): void { + this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategyContext.setOptions( + this.opts.workerChoiceStrategyOptions ) } + /** @inheritDoc */ + public enableTasksQueue (enable: boolean, opts?: TasksQueueOptions): void { + if (this.opts.enableTasksQueue === true && !enable) { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.flushTasksQueue(workerNodeKey) + } + } + this.opts.enableTasksQueue = enable + this.setTasksQueueOptions(opts as TasksQueueOptions) + } + + /** @inheritDoc */ + public setTasksQueueOptions (opts: TasksQueueOptions): void { + if (this.opts.enableTasksQueue === true) { + this.checkValidTasksQueueOptions(opts) + this.opts.tasksQueueOptions = this.buildTasksQueueOptions(opts) + } else { + delete this.opts.tasksQueueOptions + } + } + + private buildTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions + ): TasksQueueOptions { + return { + concurrency: tasksQueueOptions?.concurrency ?? 1 + } + } + /** * Whether the pool is full or not. * @@ -506,6 +558,7 @@ export abstract class AbstractPool< * Gets the given worker its tasks usage in the pool. * * @param worker - The worker. + * @throws Error if the worker is not found in the pool worker nodes. * @returns The worker tasks usage. */ private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {