X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=3d10391c17748cdd63473c4afae7630665023008;hb=f4d1dbd1592e24d7a09f35013c3e0f0762240254;hp=0cb4c30ce14ca40b3c2784ffa9cab3e65de0004b;hpb=4608939f425568da5553db1f5bf883d4cca18e02;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0cb4c30c..3d10391c 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -223,16 +223,18 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true + this.checkValidWorkerChoiceStrategy( + opts.workerChoiceStrategy as WorkerChoiceStrategy + ) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN - this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) + this.checkValidWorkerChoiceStrategyOptions( + opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + ) this.opts.workerChoiceStrategyOptions = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts.workerChoiceStrategyOptions } - this.checkValidWorkerChoiceStrategyOptions( - this.opts.workerChoiceStrategyOptions - ) this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false @@ -252,7 +254,10 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy ): void { - if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) { + if ( + workerChoiceStrategy != null && + !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) + ) { throw new Error( `Invalid worker choice strategy '${workerChoiceStrategy}'` ) @@ -262,13 +267,16 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { - if (!isPlainObject(workerChoiceStrategyOptions)) { + if ( + workerChoiceStrategyOptions != null && + !isPlainObject(workerChoiceStrategyOptions) + ) { throw new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && !Number.isSafeInteger(workerChoiceStrategyOptions.retries) ) { throw new TypeError( @@ -276,7 +284,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && workerChoiceStrategyOptions.retries < 0 ) { throw new RangeError( @@ -284,7 +292,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.weights != null && + workerChoiceStrategyOptions?.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize ) { throw new Error( @@ -292,7 +300,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.measurement != null && + workerChoiceStrategyOptions?.measurement != null && !Object.values(Measurements).includes( workerChoiceStrategyOptions.measurement ) @@ -311,7 +319,7 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - !Number.isSafeInteger(tasksQueueOptions?.concurrency) + !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( 'Invalid worker node tasks concurrency: must be an integer' @@ -319,23 +327,23 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - tasksQueueOptions?.concurrency <= 0 + tasksQueueOptions.concurrency <= 0 ) { throw new RangeError( - `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero` + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } if ( tasksQueueOptions?.size != null && - !Number.isSafeInteger(tasksQueueOptions?.size) + !Number.isSafeInteger(tasksQueueOptions.size) ) { throw new TypeError( 'Invalid worker node tasks queue size: must be an integer' ) } - if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) { + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero` + `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -670,6 +678,20 @@ export abstract class AbstractPool< } } + private buildTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions + ): TasksQueueOptions { + return { + ...{ + size: Math.pow(this.maxSize, 2), + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true + }, + ...tasksQueueOptions + } + } + private setTasksQueueSize (size: number): void { for (const workerNode of this.workerNodes) { workerNode.tasksQueueBackPressureSize = size @@ -702,20 +724,6 @@ export abstract class AbstractPool< } } - private buildTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions - ): TasksQueueOptions { - return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, - ...tasksQueueOptions - } - } - /** * Whether the pool is full or not. * @@ -1233,9 +1241,9 @@ export abstract class AbstractPool< this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) if ( - this.opts.restartWorkerOnError === true && this.started && - !this.starting + !this.starting && + this.opts.restartWorkerOnError === true ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() @@ -1243,7 +1251,7 @@ export abstract class AbstractPool< this.createAndSetupWorkerNode() } } - if (this.opts.enableTasksQueue === true) { + if (this.started && this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(workerNodeKey) } }) @@ -1511,8 +1519,8 @@ export abstract class AbstractPool< ) workerInfo.ready = message.ready as boolean workerInfo.taskFunctionNames = message.taskFunctionNames - if (this.emitter != null && this.ready) { - this.emitter.emit(PoolEvents.ready, this.info) + if (this.ready) { + this.emitter?.emit(PoolEvents.ready, this.info) } }