refactor: reorder pool options validation
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Sep 2023 14:01:27 +0000 (16:01 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Sep 2023 14:01:27 +0000 (16:01 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/worker/worker-options.ts
tests/pools/abstract/abstract-pool.test.js

index eb4c415b4a21be72446bb2da22db83ea9891acb7..e61711a8843228e6d4a91724ea00fdf6b4ad8ca4 100644 (file)
@@ -213,16 +213,18 @@ export abstract class AbstractPool<
   private checkPoolOptions (opts: PoolOptions<Worker>): void {
     if (isPlainObject(opts)) {
       this.opts.startWorkers = opts.startWorkers ?? true
+      this.checkValidWorkerChoiceStrategy(
+        opts.workerChoiceStrategy as WorkerChoiceStrategy
+      )
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
-      this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
+      this.checkValidWorkerChoiceStrategyOptions(
+        opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
+      )
       this.opts.workerChoiceStrategyOptions = {
         ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
         ...opts.workerChoiceStrategyOptions
       }
-      this.checkValidWorkerChoiceStrategyOptions(
-        this.opts.workerChoiceStrategyOptions
-      )
       this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
       this.opts.enableEvents = opts.enableEvents ?? true
       this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
@@ -242,7 +244,10 @@ export abstract class AbstractPool<
   private checkValidWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy
   ): void {
-    if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
+    if (
+      workerChoiceStrategy != null &&
+      !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
+    ) {
       throw new Error(
         `Invalid worker choice strategy '${workerChoiceStrategy}'`
       )
@@ -252,13 +257,16 @@ export abstract class AbstractPool<
   private checkValidWorkerChoiceStrategyOptions (
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
-    if (!isPlainObject(workerChoiceStrategyOptions)) {
+    if (
+      workerChoiceStrategyOptions != null &&
+      !isPlainObject(workerChoiceStrategyOptions)
+    ) {
       throw new TypeError(
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
     if (
-      workerChoiceStrategyOptions.retries != null &&
+      workerChoiceStrategyOptions?.retries != null &&
       !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
     ) {
       throw new TypeError(
@@ -266,7 +274,7 @@ export abstract class AbstractPool<
       )
     }
     if (
-      workerChoiceStrategyOptions.retries != null &&
+      workerChoiceStrategyOptions?.retries != null &&
       workerChoiceStrategyOptions.retries < 0
     ) {
       throw new RangeError(
@@ -274,7 +282,7 @@ export abstract class AbstractPool<
       )
     }
     if (
-      workerChoiceStrategyOptions.weights != null &&
+      workerChoiceStrategyOptions?.weights != null &&
       Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
     ) {
       throw new Error(
@@ -282,7 +290,7 @@ export abstract class AbstractPool<
       )
     }
     if (
-      workerChoiceStrategyOptions.measurement != null &&
+      workerChoiceStrategyOptions?.measurement != null &&
       !Object.values(Measurements).includes(
         workerChoiceStrategyOptions.measurement
       )
@@ -301,7 +309,7 @@ export abstract class AbstractPool<
     }
     if (
       tasksQueueOptions?.concurrency != null &&
-      !Number.isSafeInteger(tasksQueueOptions?.concurrency)
+      !Number.isSafeInteger(tasksQueueOptions.concurrency)
     ) {
       throw new TypeError(
         'Invalid worker node tasks concurrency: must be an integer'
@@ -309,23 +317,23 @@ export abstract class AbstractPool<
     }
     if (
       tasksQueueOptions?.concurrency != null &&
-      tasksQueueOptions?.concurrency <= 0
+      tasksQueueOptions.concurrency <= 0
     ) {
       throw new RangeError(
-        `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
+        `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
       )
     }
     if (
       tasksQueueOptions?.size != null &&
-      !Number.isSafeInteger(tasksQueueOptions?.size)
+      !Number.isSafeInteger(tasksQueueOptions.size)
     ) {
       throw new TypeError(
         'Invalid worker node tasks queue size: must be an integer'
       )
     }
-    if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
+    if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
       throw new RangeError(
-        `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
+        `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
       )
     }
   }
index 384b2a1104498520719c220aa9300398faf1d2bd..10589643d65764f3d68ffc41345d52e9e19250d4 100644 (file)
@@ -52,6 +52,8 @@ export interface WorkerOptions {
   maxInactiveTime?: number
   /**
    * The function to call when a worker is killed.
+   *
+   * @defaultValue `() => {}`
    */
   killHandler?: KillHandler
   /**
index aeede25d7149a9c5918626ae9e860e7733cea4b4..e9c65dde91de251b6228c303861696a9f8629162 100644 (file)
@@ -238,7 +238,7 @@ describe('Abstract pool test suite', () => {
       enableTasksQueue: true,
       tasksQueueOptions: {
         concurrency: 2,
-        size: 4,
+        size: Math.pow(numberOfWorkers, 2),
         taskStealing: true,
         tasksStealingOnBackPressure: true
       },
@@ -638,7 +638,7 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
-      size: 4,
+      size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true
     })
@@ -650,7 +650,7 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
-      size: 4,
+      size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true
     })
@@ -676,26 +676,33 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
-      size: 4,
+      size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true
     })
     for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksQueueBackPressureSize).toBe(
+        pool.opts.tasksQueueOptions.size
+      )
       expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
       expect(workerNode.onBackPressure).toBeInstanceOf(Function)
     }
     pool.setTasksQueueOptions({
       concurrency: 2,
+      size: 2,
       taskStealing: false,
       tasksStealingOnBackPressure: false
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 2,
-      size: 4,
+      size: 2,
       taskStealing: false,
       tasksStealingOnBackPressure: false
     })
     for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksQueueBackPressureSize).toBe(
+        pool.opts.tasksQueueOptions.size
+      )
       expect(workerNode.onEmptyQueue).toBeUndefined()
       expect(workerNode.onBackPressure).toBeUndefined()
     }
@@ -706,11 +713,14 @@ describe('Abstract pool test suite', () => {
     })
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
       concurrency: 1,
-      size: 4,
+      size: Math.pow(numberOfWorkers, 2),
       taskStealing: true,
       tasksStealingOnBackPressure: true
     })
     for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksQueueBackPressureSize).toBe(
+        pool.opts.tasksQueueOptions.size
+      )
       expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
       expect(workerNode.onBackPressure).toBeInstanceOf(Function)
     }