X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=5d649f19da42790b617da8bc4a8c1385b98d1684;hb=18d5678258f31cff39813c412cb0bc12161120ae;hp=6761882589bb14bd9d205c33684ac7e7b3ab5128;hpb=5972baf82c740050a6da461a8deb8497fcaa8580;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 67618825..5d649f19 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -223,16 +223,18 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true + this.checkValidWorkerChoiceStrategy( + opts.workerChoiceStrategy as WorkerChoiceStrategy + ) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN - this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) + this.checkValidWorkerChoiceStrategyOptions( + opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + ) this.opts.workerChoiceStrategyOptions = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts.workerChoiceStrategyOptions } - this.checkValidWorkerChoiceStrategyOptions( - this.opts.workerChoiceStrategyOptions - ) this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false @@ -252,7 +254,10 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy ): void { - if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) { + if ( + workerChoiceStrategy != null && + !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) + ) { throw new Error( `Invalid worker choice strategy '${workerChoiceStrategy}'` ) @@ -262,13 +267,16 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { - if (!isPlainObject(workerChoiceStrategyOptions)) { + if ( + workerChoiceStrategyOptions != null && + !isPlainObject(workerChoiceStrategyOptions) + ) { throw new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && !Number.isSafeInteger(workerChoiceStrategyOptions.retries) ) { throw new TypeError( @@ -276,7 +284,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && workerChoiceStrategyOptions.retries < 0 ) { throw new RangeError( @@ -284,7 +292,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.weights != null && + workerChoiceStrategyOptions?.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize ) { throw new Error( @@ -292,7 +300,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.measurement != null && + workerChoiceStrategyOptions?.measurement != null && !Object.values(Measurements).includes( workerChoiceStrategyOptions.measurement ) @@ -311,7 +319,7 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - !Number.isSafeInteger(tasksQueueOptions?.concurrency) + !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( 'Invalid worker node tasks concurrency: must be an integer' @@ -319,23 +327,23 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - tasksQueueOptions?.concurrency <= 0 + tasksQueueOptions.concurrency <= 0 ) { throw new RangeError( - `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero` + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } if ( tasksQueueOptions?.size != null && - !Number.isSafeInteger(tasksQueueOptions?.size) + !Number.isSafeInteger(tasksQueueOptions.size) ) { throw new TypeError( 'Invalid worker node tasks queue size: must be an integer' ) } - if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) { + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero` + `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -640,6 +648,8 @@ export abstract class AbstractPool< tasksQueueOptions?: TasksQueueOptions ): void { if (this.opts.enableTasksQueue === true && !enable) { + this.unsetTaskStealing() + this.unsetTasksStealingOnBackPressure() this.flushTasksQueues() } this.opts.enableTasksQueue = enable @@ -653,6 +663,16 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + if (this.opts.tasksQueueOptions.taskStealing === true) { + this.setTaskStealing() + } else { + this.unsetTaskStealing() + } + if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.setTasksStealingOnBackPressure() + } else { + this.unsetTasksStealingOnBackPressure() + } } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } @@ -664,6 +684,32 @@ export abstract class AbstractPool< } } + private setTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + } + } + + private unsetTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + delete this.workerNodes[workerNodeKey].onEmptyQueue + } + } + + private setTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } + } + + private unsetTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + delete this.workerNodes[workerNodeKey].onBackPressure + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions {