fix: fix worker function type definition and validation
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 14 Apr 2023 20:00:04 +0000 (22:00 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 14 Apr 2023 20:00:04 +0000 (22:00 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js
tests/worker/abstract-worker.test.js

index 9ba7cb1b2ea379ac8baf25c252149efdca9be63a..5efd1b843646f37ba9a0ded0916782f869ed1149 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Fix worker function type definition and validation.
+
 ## [2.4.8] - 2023-04-12
 
 ### Fixed
index 7563e97bf5cadf01b296ed1811def122d6e875dc..73eec1c6999d2aae78fa7e0a70038d30d6993997 100644 (file)
@@ -41,7 +41,7 @@ export abstract class AbstractWorker<
   public constructor (
     type: string,
     protected readonly isMain: boolean,
-    fn: (data: Data) => Response,
+    fn: (data: Data) => Response | Promise<Response>,
     protected mainWorker: MainWorker | undefined | null,
     protected readonly opts: WorkerOptions = {
       /**
@@ -56,8 +56,8 @@ export abstract class AbstractWorker<
     }
   ) {
     super(type)
-    this.checkFunctionInput(fn)
     this.checkWorkerOptions(this.opts)
+    this.checkFunctionInput(fn)
     if (!this.isMain) {
       this.lastTaskTimestamp = performance.now()
       this.aliveInterval = setInterval(
@@ -83,7 +83,7 @@ export abstract class AbstractWorker<
    */
   protected messageListener (
     message: MessageValue<Data, MainWorker>,
-    fn: (data: Data) => Response
+    fn: (data: Data) => Response | Promise<Response>
   ): void {
     if (message.id != null && message.data != null) {
       // Task message received
@@ -114,11 +114,18 @@ export abstract class AbstractWorker<
    *
    * @param fn - The function that should be defined.
    */
-  private checkFunctionInput (fn: (data: Data) => Response): void {
+  private checkFunctionInput (
+    fn: (data: Data) => Response | Promise<Response>
+  ): void {
     if (fn == null) throw new Error('fn parameter is mandatory')
     if (typeof fn !== 'function') {
       throw new TypeError('fn parameter is not a function')
     }
+    if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
+      throw new Error(
+        'fn parameter is an async function, please set the async option to true'
+      )
+    }
   }
 
   /**
index 655520ccedb84aeccc0c1231dfb7c4566747e2c0..1d2729812f905d9f9724e5bf9106358ae8aa893a 100644 (file)
@@ -28,7 +28,10 @@ export class ClusterWorker<
    * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
    * @param opts - Options for the worker.
    */
-  public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+  public constructor (
+    fn: (data: Data) => Response | Promise<Response>,
+    opts: WorkerOptions = {}
+  ) {
     super(
       'worker-cluster-pool:poolifier',
       cluster.isPrimary,
index ac775a09842c970629a07ef29fa5b961a8e53a9c..ad2bc644220289c596420513208dd2b77b5db00d 100644 (file)
@@ -28,7 +28,10 @@ export class ThreadWorker<
    * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
    * @param opts - Options for the worker.
    */
-  public constructor (fn: (data: Data) => Response, opts: WorkerOptions = {}) {
+  public constructor (
+    fn: (data: Data) => Response | Promise<Response>,
+    opts: WorkerOptions = {}
+  ) {
     super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
   }
 
index 1a5a75a08df6ab03d84d7fa0aff06f617f59a797..86506f671f29848f2a8f56f25943f708f886de03 100644 (file)
@@ -15,7 +15,7 @@ describe('Abstract pool test suite', () => {
   )
   class StubPoolWithRemoveAllWorker extends FixedThreadPool {
     removeAllWorker () {
-      this.workers = []
+      this.workerNodes = []
       this.promiseResponseMap.clear()
     }
   }
@@ -35,7 +35,7 @@ describe('Abstract pool test suite', () => {
             errorHandler: e => console.error(e)
           }
         )
-    ).toThrowError(new Error('Cannot start a pool from a worker!'))
+    ).toThrowError('Cannot start a pool from a worker!')
   })
 
   it('Verify that filePath is checked', () => {
@@ -52,9 +52,7 @@ describe('Abstract pool test suite', () => {
 
   it('Verify that numberOfWorkers is checked', () => {
     expect(() => new FixedThreadPool()).toThrowError(
-      new Error(
-        'Cannot instantiate a pool without specifying the number of workers'
-      )
+      'Cannot instantiate a pool without specifying the number of workers'
     )
   })
 
@@ -88,6 +86,7 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.enableEvents).toBe(true)
     expect(pool.emitter).toBeDefined()
     expect(pool.opts.enableTasksQueue).toBe(false)
+    expect(pool.opts.tasksQueueOptions).toBeUndefined()
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.ROUND_ROBIN
     )
@@ -108,6 +107,7 @@ describe('Abstract pool test suite', () => {
         workerChoiceStrategyOptions: { medRunTime: true },
         enableEvents: false,
         enableTasksQueue: true,
+        tasksQueueOptions: { concurrency: 2 },
         messageHandler: testHandler,
         errorHandler: testHandler,
         onlineHandler: testHandler,
@@ -117,6 +117,7 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.enableEvents).toBe(false)
     expect(pool.emitter).toBeUndefined()
     expect(pool.opts.enableTasksQueue).toBe(true)
+    expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2 })
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.LESS_USED
     )
@@ -130,7 +131,31 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it('Simulate worker not found during getWorkerTasksUsage', async () => {
+  it('Verify that pool options are valid', async () => {
+    expect(
+      () =>
+        new FixedThreadPool(
+          numberOfWorkers,
+          './tests/worker-files/thread/testWorker.js',
+          {
+            enableTasksQueue: true,
+            tasksQueueOptions: { concurrency: 0 }
+          }
+        )
+    ).toThrowError("Invalid worker tasks concurrency '0'")
+    expect(
+      () =>
+        new FixedThreadPool(
+          numberOfWorkers,
+          './tests/worker-files/thread/testWorker.js',
+          {
+            workerChoiceStrategy: 'invalidStrategy'
+          }
+        )
+    ).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
+  })
+
+  it('Simulate worker not found at getWorkerTasksUsage()', async () => {
     const pool = new StubPoolWithRemoveAllWorker(
       numberOfWorkers,
       './tests/worker-files/cluster/testWorker.js',
@@ -138,8 +163,10 @@ describe('Abstract pool test suite', () => {
         errorHandler: e => console.error(e)
       }
     )
+    expect(pool.workerNodes.length).toBe(numberOfWorkers)
     // Simulate worker not found.
     pool.removeAllWorker()
+    expect(pool.workerNodes.length).toBe(0)
     expect(() => pool.getWorkerTasksUsage()).toThrowError(
       workerNotFoundInPoolError
     )
index 0da467ca8685157e453872a84e0760f11aa2641e..23403ab78108620f555d8d9d8635cb06bcafd1e8 100644 (file)
@@ -66,7 +66,7 @@ describe('Dynamic cluster pool test suite', () => {
 
   it('Validation of inputs test', () => {
     expect(() => new DynamicClusterPool(min)).toThrowError(
-      new Error('Please specify a file with a worker implementation')
+      'Please specify a file with a worker implementation'
     )
   })
 
index c28b8021db11f0342ba3130146cc0e54d2b66eb5..4aaa57c750920944e6cf2c4ed02f43104cb12fa0 100644 (file)
@@ -206,6 +206,6 @@ describe('Fixed cluster pool test suite', () => {
     expect(
       () =>
         new FixedClusterPool(0, './tests/worker-files/cluster/testWorker.js')
-    ).toThrowError(new Error('Cannot instantiate a fixed pool with no worker'))
+    ).toThrowError('Cannot instantiate a fixed pool with no worker')
   })
 })
index 7755cc341f6ec97f143ba7253dae95601345de3e..2fcbba5a95054b20311baa45892e26f24733b1fe 100644 (file)
@@ -714,8 +714,6 @@ describe('Selection strategies test suite', () => {
           './tests/worker-files/thread/testWorker.js',
           { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
         )
-    ).toThrowError(
-      new Error("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
-    )
+    ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
   })
 })
index 11d91128f3dc2716ae3901b21a132e3aa10996dc..29ce01529b9a18710ef261fa87eb1d5604fb7de0 100644 (file)
@@ -66,7 +66,7 @@ describe('Dynamic thread pool test suite', () => {
 
   it('Validation of inputs test', () => {
     expect(() => new DynamicThreadPool(min)).toThrowError(
-      new Error('Please specify a file with a worker implementation')
+      'Please specify a file with a worker implementation'
     )
   })
 
index 75c4a935ab57c9e1ae1fd685a4b20183affbaf8c..6cccd6dbc6dd76166710e422f990372d9879649a 100644 (file)
@@ -186,6 +186,6 @@ describe('Fixed thread pool test suite', () => {
   it('Verify that a pool with zero worker fails', async () => {
     expect(
       () => new FixedThreadPool(0, './tests/worker-files/thread/testWorker.js')
-    ).toThrowError(new Error('Cannot instantiate a fixed pool with no worker'))
+    ).toThrowError('Cannot instantiate a fixed pool with no worker')
   })
 })
index fbc9a06d26082b007d01572e95f02f610dc1f1a5..2eae1f81bfdc0251d416882ec1d1b3eb15d2bc51 100644 (file)
@@ -9,18 +9,6 @@ describe('Abstract worker test suite', () => {
     }
   }
 
-  it('Verify that fn parameter is mandatory', () => {
-    expect(() => new ClusterWorker()).toThrowError(
-      new Error('fn parameter is mandatory')
-    )
-  })
-
-  it('Verify that fn parameter is a function', () => {
-    expect(() => new ClusterWorker({})).toThrowError(
-      new TypeError('fn parameter is not a function')
-    )
-  })
-
   it('Verify worker options default values', () => {
     const worker = new ThreadWorker(() => {})
     expect(worker.opts.maxInactiveTime).toStrictEqual(60000)
@@ -39,6 +27,28 @@ describe('Abstract worker test suite', () => {
     expect(worker.opts.async).toBe(true)
   })
 
+  it('Verify that fn parameter is mandatory', () => {
+    expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory')
+  })
+
+  it('Verify that fn parameter is a function', () => {
+    expect(() => new ClusterWorker({})).toThrowError(
+      new TypeError('fn parameter is not a function')
+    )
+    expect(() => new ClusterWorker('')).toThrowError(
+      new TypeError('fn parameter is not a function')
+    )
+  })
+
+  it('Verify that async fn parameter without async option throw error', () => {
+    const fn = async () => {
+      return new Promise()
+    }
+    expect(() => new ClusterWorker(fn)).toThrowError(
+      'fn parameter is an async function, please set the async option to true'
+    )
+  })
+
   it('Verify that handleError function is working properly', () => {
     const error = new Error('My error')
     const worker = new ThreadWorker(() => {})
@@ -48,6 +58,6 @@ describe('Abstract worker test suite', () => {
   it('Verify that get main worker throw error if main worker is not set', () => {
     expect(() =>
       new StubPoolWithIsMainWorker(() => {}).getMainWorker()
-    ).toThrowError(new Error('Main worker was not set'))
+    ).toThrowError('Main worker was not set')
   })
 })