From 2324f8c9233f6e036eee418327c46bb7e71356b9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 17 Sep 2023 16:01:27 +0200 Subject: [PATCH] refactor: reorder pool options validation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 40 +++++++++++++--------- src/worker/worker-options.ts | 2 ++ tests/pools/abstract/abstract-pool.test.js | 22 ++++++++---- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb4c415b..e61711a8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -213,16 +213,18 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true + this.checkValidWorkerChoiceStrategy( + opts.workerChoiceStrategy as WorkerChoiceStrategy + ) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN - this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) + this.checkValidWorkerChoiceStrategyOptions( + opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + ) this.opts.workerChoiceStrategyOptions = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts.workerChoiceStrategyOptions } - this.checkValidWorkerChoiceStrategyOptions( - this.opts.workerChoiceStrategyOptions - ) this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false @@ -242,7 +244,10 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy ): void { - if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) { + if ( + workerChoiceStrategy != null && + !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy) + ) { throw new Error( `Invalid worker choice strategy '${workerChoiceStrategy}'` ) @@ -252,13 +257,16 @@ export abstract class AbstractPool< private checkValidWorkerChoiceStrategyOptions ( workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { - if (!isPlainObject(workerChoiceStrategyOptions)) { + if ( + workerChoiceStrategyOptions != null && + !isPlainObject(workerChoiceStrategyOptions) + ) { throw new TypeError( 'Invalid worker choice strategy options: must be a plain object' ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && !Number.isSafeInteger(workerChoiceStrategyOptions.retries) ) { throw new TypeError( @@ -266,7 +274,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.retries != null && + workerChoiceStrategyOptions?.retries != null && workerChoiceStrategyOptions.retries < 0 ) { throw new RangeError( @@ -274,7 +282,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.weights != null && + workerChoiceStrategyOptions?.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize ) { throw new Error( @@ -282,7 +290,7 @@ export abstract class AbstractPool< ) } if ( - workerChoiceStrategyOptions.measurement != null && + workerChoiceStrategyOptions?.measurement != null && !Object.values(Measurements).includes( workerChoiceStrategyOptions.measurement ) @@ -301,7 +309,7 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - !Number.isSafeInteger(tasksQueueOptions?.concurrency) + !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( 'Invalid worker node tasks concurrency: must be an integer' @@ -309,23 +317,23 @@ export abstract class AbstractPool< } if ( tasksQueueOptions?.concurrency != null && - tasksQueueOptions?.concurrency <= 0 + tasksQueueOptions.concurrency <= 0 ) { throw new RangeError( - `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero` + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } if ( tasksQueueOptions?.size != null && - !Number.isSafeInteger(tasksQueueOptions?.size) + !Number.isSafeInteger(tasksQueueOptions.size) ) { throw new TypeError( 'Invalid worker node tasks queue size: must be an integer' ) } - if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) { + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { throw new RangeError( - `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero` + `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 384b2a11..10589643 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -52,6 +52,8 @@ export interface WorkerOptions { maxInactiveTime?: number /** * The function to call when a worker is killed. + * + * @defaultValue `() => {}` */ killHandler?: KillHandler /** diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index aeede25d..e9c65dde 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -238,7 +238,7 @@ describe('Abstract pool test suite', () => { enableTasksQueue: true, tasksQueueOptions: { concurrency: 2, - size: 4, + size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true }, @@ -638,7 +638,7 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4, + size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true }) @@ -650,7 +650,7 @@ describe('Abstract pool test suite', () => { expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: 4, + size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true }) @@ -676,26 +676,33 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4, + size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true }) for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueueBackPressureSize).toBe( + pool.opts.tasksQueueOptions.size + ) expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) expect(workerNode.onBackPressure).toBeInstanceOf(Function) } pool.setTasksQueueOptions({ concurrency: 2, + size: 2, taskStealing: false, tasksStealingOnBackPressure: false }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: 4, + size: 2, taskStealing: false, tasksStealingOnBackPressure: false }) for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueueBackPressureSize).toBe( + pool.opts.tasksQueueOptions.size + ) expect(workerNode.onEmptyQueue).toBeUndefined() expect(workerNode.onBackPressure).toBeUndefined() } @@ -706,11 +713,14 @@ describe('Abstract pool test suite', () => { }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: 4, + size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: true }) for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueueBackPressureSize).toBe( + pool.opts.tasksQueueOptions.size + ) expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) expect(workerNode.onBackPressure).toBeInstanceOf(Function) } -- 2.34.1