fix: fix worker task functions handling
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 12 Jul 2023 10:05:56 +0000 (12:05 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 12 Jul 2023 10:05:56 +0000 (12:05 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/worker/abstract-worker.ts
tests/test-utils.js
tests/worker/abstract-worker.test.js

index 42e602065c8365ae9b4335f684ad27b0388dd8db..ab698d75ff93227d209b8d974fd1778768476f7a 100644 (file)
@@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [2.6.15] - 2023-07-11
 
+### Fixed
+
+- Fix pool starting semantic.
+- Fix worker task functions handling.
+
 ### Added
 
 - Take into account worker node readiness in worker choice strategies.
index 3728b619720a67590f6be5c018b8c883dcd9dfc1..7d3be8d693987f80cd4a5a5c7050334ac6e304d8 100644 (file)
@@ -939,9 +939,7 @@ export abstract class AbstractPool<
       const workerNodeKey = this.getWorkerNodeKey(worker)
       const workerInfo = this.getWorkerInfo(workerNodeKey)
       workerInfo.ready = false
-      if (this.emitter != null) {
-        this.emitter.emit(PoolEvents.error, error)
-      }
+      this.emitter?.emit(PoolEvents.error, error)
       if (this.opts.restartWorkerOnError === true && !this.starting) {
         if (workerInfo.dynamic) {
           this.createAndSetupDynamicWorker()
@@ -1096,9 +1094,7 @@ export abstract class AbstractPool<
     const promiseResponse = this.promiseResponseMap.get(message.id as string)
     if (promiseResponse != null) {
       if (message.taskError != null) {
-        if (this.emitter != null) {
-          this.emitter.emit(PoolEvents.taskError, message.taskError)
-        }
+        this.emitter?.emit(PoolEvents.taskError, message.taskError)
         promiseResponse.reject(message.taskError.message)
       } else {
         promiseResponse.resolve(message.data as Response)
index 43cc4299765a155289ead75952e4c05c1fd42797..1b8a673c2a78d461288bc6c4f30fa8eb5a942285 100644 (file)
@@ -119,7 +119,15 @@ export abstract class AbstractWorker<
     }
     this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
     if (typeof taskFunctions === 'function') {
-      this.taskFunctions.set(DEFAULT_TASK_NAME, taskFunctions.bind(this))
+      const boundFn = taskFunctions.bind(this)
+      this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
+      this.taskFunctions.set(
+        typeof taskFunctions.name === 'string' &&
+          taskFunctions.name.trim().length > 0
+          ? taskFunctions.name
+          : 'fn1',
+        boundFn
+      )
     } else if (isPlainObject(taskFunctions)) {
       let firstEntry = true
       for (const [name, fn] of Object.entries(taskFunctions)) {
@@ -133,9 +141,10 @@ export abstract class AbstractWorker<
             'A taskFunctions parameter object value is not a function'
           )
         }
-        this.taskFunctions.set(name, fn.bind(this))
+        const boundFn = fn.bind(this)
+        this.taskFunctions.set(name, boundFn)
         if (firstEntry) {
-          this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this))
+          this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
           firstEntry = false
         }
       }
@@ -189,14 +198,15 @@ export abstract class AbstractWorker<
     if (typeof fn !== 'function') {
       throw new TypeError('fn parameter is not a function')
     }
+    const boundFn = fn.bind(this)
     try {
       if (
         this.taskFunctions.get(name) ===
         this.taskFunctions.get(DEFAULT_TASK_NAME)
       ) {
-        this.taskFunctions.set(DEFAULT_TASK_NAME, fn.bind(this))
+        this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
       }
-      this.taskFunctions.set(name, fn.bind(this))
+      this.taskFunctions.set(name, boundFn)
       return true
     } catch {
       return false
@@ -257,10 +267,7 @@ export abstract class AbstractWorker<
     try {
       this.taskFunctions.set(
         DEFAULT_TASK_NAME,
-        this.taskFunctions.get(name)?.bind(this) as WorkerFunction<
-        Data,
-        Response
-        >
+        this.taskFunctions.get(name) as WorkerFunction<Data, Response>
       )
       return true
     } catch {
index 1fd59942c4e176a8b8a65e5d03adc5f999a93185..177a5c28e5e13e57284318fb77ea58005bdc06dd 100644 (file)
@@ -23,7 +23,7 @@ const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
     if (numberOfEventsToWait === 0) {
       resolve(events)
     }
-    pool.emitter.on(poolEvent, () => {
+    pool?.emitter.on(poolEvent, () => {
       ++events
       if (events === numberOfEventsToWait) {
         resolve(events)
index bf3dc28e6874cdf91f4714eee90bac11f4de75af..429fff7da79d2c3d4797e645fd3d90c9d73933c9 100644 (file)
@@ -82,6 +82,16 @@ describe('Abstract worker test suite', () => {
     )
   })
 
+  it('Verify that taskFunctions parameter with unique function is taken', () => {
+    const worker = new ThreadWorker(() => {})
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(2)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+  })
+
   it('Verify that taskFunctions parameter with multiple task functions contains function', () => {
     const fn1 = () => {
       return 1
@@ -100,9 +110,13 @@ describe('Abstract worker test suite', () => {
       return 2
     }
     const worker = new ClusterWorker({ fn1, fn2 })
-    expect(typeof worker.taskFunctions.get('default') === 'function').toBe(true)
-    expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true)
-    expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true)
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(3)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
   })
 
   it('Verify that handleError() method works properly', () => {
@@ -119,4 +133,118 @@ describe('Abstract worker test suite', () => {
       new StubWorkerWithMainWorker(() => {}).getMainWorker()
     ).toThrowError('Main worker not set')
   })
+
+  it('Verify that hasTaskFunction() works', () => {
+    const fn1 = () => {
+      return 1
+    }
+    const fn2 = () => {
+      return 2
+    }
+    const worker = new ClusterWorker({ fn1, fn2 })
+    expect(worker.hasTaskFunction('default')).toBe(true)
+    expect(worker.hasTaskFunction('fn1')).toBe(true)
+    expect(worker.hasTaskFunction('fn2')).toBe(true)
+    expect(worker.hasTaskFunction('fn3')).toBe(false)
+  })
+
+  it('Verify that addTaskFunction() works', () => {
+    const fn1 = () => {
+      return 1
+    }
+    const fn2 = () => {
+      return 2
+    }
+    const fn1Replacement = () => {
+      return 3
+    }
+    const worker = new ThreadWorker(fn1)
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(2)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+    expect(() => worker.addTaskFunction('default', fn2)).toThrowError(
+      new Error('Cannot add a task function with the default reserved name')
+    )
+    worker.addTaskFunction('fn2', fn2)
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(3)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+    worker.addTaskFunction('fn1', fn1Replacement)
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(3)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+  })
+
+  it('Verify that removeTaskFunction() works', () => {
+    const fn1 = () => {
+      return 1
+    }
+    const fn2 = () => {
+      return 2
+    }
+    const worker = new ThreadWorker({ fn1, fn2 })
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(3)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+    expect(() => worker.removeTaskFunction('default')).toThrowError(
+      new Error(
+        'Cannot remove the task function with the default reserved name'
+      )
+    )
+    expect(() => worker.removeTaskFunction('fn1')).toThrowError(
+      new Error(
+        'Cannot remove the task function used as the default task function'
+      )
+    )
+    worker.removeTaskFunction('fn2')
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn2')).toBeUndefined()
+    expect(worker.taskFunctions.size).toBe(2)
+  })
+
+  it('Verify that setDefaultTaskFunction() works', () => {
+    const fn1 = () => {
+      return 1
+    }
+    const fn2 = () => {
+      return 2
+    }
+    const worker = new ThreadWorker({ fn1, fn2 })
+    expect(worker.taskFunctions.get('default')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function)
+    expect(worker.taskFunctions.size).toBe(3)
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+    expect(() => worker.setDefaultTaskFunction('default')).toThrowError(
+      new Error(
+        'Cannot set the default task function reserved name as the default task function'
+      )
+    )
+    worker.setDefaultTaskFunction('fn1')
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn1')
+    )
+    worker.setDefaultTaskFunction('fn2')
+    expect(worker.taskFunctions.get('default')).toStrictEqual(
+      worker.taskFunctions.get('fn2')
+    )
+  })
 })