test: add multi tasks worker
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 5 May 2023 11:53:31 +0000 (13:53 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 5 May 2023 11:53:31 +0000 (13:53 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
13 files changed:
examples/multifunctionWorker.js
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js
tests/worker-files/cluster/testMultiTasksWorker.js [new file with mode: 0644]
tests/worker-files/cluster/testWorker.js
tests/worker-files/thread/testMultiTasksWorker.js [new file with mode: 0644]
tests/worker-files/thread/testWorker.js

index a38d8bb0872e82658abac89a01408d37f6983960..53217fa04957821ec4def7bca47b4334e21bf4d0 100644 (file)
@@ -3,12 +3,12 @@ const { ThreadWorker } = require('poolifier')
 
 function fn0 (data) {
   console.log('Executing function 0')
-  return { data: 'fn0 your input was' + data.text }
+  return { data: 'fn0 your input text was' + data.text }
 }
 
 function fn1 (data) {
   console.log('Executing function 1')
-  return { data: 'fn1 your input was' + data.text }
+  return { data: 'fn1 your input text was' + data.text }
 }
 
 module.exports = new ThreadWorker({ fn0, fn1 })
index 7bf86848690538010dd9acbf3cb2cee2bb411c26..2db5f630fefa5a1e81161ca75362e7c73fc450a1 100644 (file)
@@ -46,7 +46,7 @@ export interface Task<Data = unknown> {
    */
   readonly data?: Data
   /**
-   * UUID of the message.
+   * Message UUID.
    */
   readonly id?: string
 }
index 54e66e1bbe9432c1c23c2851ca0466486d5efed5..331085febb79486a590ed3aff0020eb11b08f02d 100644 (file)
@@ -48,6 +48,7 @@ export interface MessageValue<
 export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
   data?: Data
 ) => Response
+
 /**
  * Worker asynchronous function that can be executed.
  * This function must return a promise.
@@ -58,6 +59,7 @@ export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
 export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
   data?: Data
 ) => Promise<Response>
+
 /**
  * Worker function that can be executed.
  * This function can be synchronous or asynchronous.
@@ -68,8 +70,9 @@ export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
 export type WorkerFunction<Data = unknown, Response = unknown> =
   | WorkerSyncFunction<Data, Response>
   | WorkerAsyncFunction<Data, Response>
+
 /**
- * Worker functions that can be executed object.
+ * Worker functions that can be executed.
  * This object can contain synchronous or asynchronous functions.
  * The key is the name of the function.
  * The value is the function itself.
index 2aca9e914b2f4f296624f842ccbee9e159790815..3ca52b69841c132f2f9e399f9aabc56a473c721e 100644 (file)
@@ -148,7 +148,7 @@ export abstract class AbstractWorker<
       if (fn?.constructor.name === 'AsyncFunction') {
         this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
       } else {
-        this.runInAsyncScope(this.run.bind(this), this, fn, message)
+        this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
       }
     } else if (message.parent != null) {
       // Main worker reference message received
@@ -207,7 +207,7 @@ export abstract class AbstractWorker<
    * @param fn - Function that will be executed.
    * @param message - Input data for the given function.
    */
-  protected run (
+  protected runSync (
     fn: WorkerSyncFunction<Data, Response>,
     message: MessageValue<Data>
   ): void {
index 366d4965960f6eddfb17856331fab8542b44fce3..972587b5a44a6fc6f44ae86ae0b82ec6351328de 100644 (file)
@@ -1,5 +1,6 @@
 const { expect } = require('expect')
 const {
+  DynamicClusterPool,
   DynamicThreadPool,
   FixedClusterPool,
   FixedThreadPool,
@@ -398,4 +399,19 @@ describe('Abstract pool test suite', () => {
     expect(poolBusy).toBe(numberOfWorkers + 1)
     await pool.destroy()
   })
+
+  it('Verify that multiple tasks worker is working', async () => {
+    const pool = new DynamicClusterPool(
+      numberOfWorkers,
+      numberOfWorkers * 2,
+      './tests/worker-files/cluster/testMultiTasksWorker.js'
+    )
+    const data = { n: 10 }
+    const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+    expect(result1).toBe(false)
+    const result2 = await pool.execute(data, 'factorial')
+    expect(result2).toBe(3628800)
+    const result3 = await pool.execute(data, 'fibonacci')
+    expect(result3).toBe(89)
+  })
 })
index e67d92bdb9849d05c09801a865e548a8208a4470..6d2bccadc1f2ae9bce01904443bb475b56ccb6a6 100644 (file)
@@ -19,11 +19,11 @@ describe('Dynamic cluster pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
index 59f76802bb97a109795e92702e470c6a670543e0..34c066100aff925579f80a8876c03a4b4d81e695 100644 (file)
@@ -65,11 +65,11 @@ describe('Fixed cluster pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that is possible to invoke the execute() method without input', async () => {
index 3e5a2d354fe8d76c5d0c73013a8a2957d5ff7d52..959693901b22a6f9bda1d8d7be607cc8a40e82cc 100644 (file)
@@ -19,11 +19,11 @@ describe('Dynamic thread pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
index 9187b817d1bf749ae956d91a8afcb9436a48b3ab..09c53ea49a6afd86338679684d16e904fb2d4f75 100644 (file)
@@ -65,11 +65,11 @@ describe('Fixed thread pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that is possible to invoke the execute() method without input', async () => {
diff --git a/tests/worker-files/cluster/testMultiTasksWorker.js b/tests/worker-files/cluster/testMultiTasksWorker.js
new file mode 100644 (file)
index 0000000..692a2e7
--- /dev/null
@@ -0,0 +1,23 @@
+'use strict'
+const { isMaster } = require('cluster')
+const { ClusterWorker, KillBehaviors } = require('../../../lib')
+const {
+  jsonIntegerSerialization,
+  factorial,
+  fibonacci
+} = require('../../test-utils')
+
+module.exports = new ClusterWorker(
+  {
+    jsonIntegerSerialization: data => {
+      jsonIntegerSerialization(data.n)
+      return isMaster
+    },
+    factorial: data => factorial(data.n),
+    fibonacci: data => fibonacci(data.n)
+  },
+  {
+    maxInactiveTime: 500,
+    killBehavior: KillBehaviors.HARD
+  }
+)
index 3f50d102a8364a1dad1ed6d67317f13435dd5942..0d0611d00833523fc984bec11849ddfdf61e8d86 100644 (file)
@@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types')
 function test (data) {
   data = data || {}
   data.function = data.function || WorkerFunctions.jsonIntegerSerialization
-  TestUtils.executeWorkerFunction(data)
-  return isMaster
+  const result = TestUtils.executeWorkerFunction(data)
+  if (result == null) {
+    return isMaster
+  }
+  return result
 }
 
 module.exports = new ClusterWorker(test, {
diff --git a/tests/worker-files/thread/testMultiTasksWorker.js b/tests/worker-files/thread/testMultiTasksWorker.js
new file mode 100644 (file)
index 0000000..da357a2
--- /dev/null
@@ -0,0 +1,23 @@
+'use strict'
+const { isMainThread } = require('worker_threads')
+const { ThreadWorker, KillBehaviors } = require('../../../lib')
+const {
+  jsonIntegerSerialization,
+  factorial,
+  fibonacci
+} = require('../../test-utils')
+
+module.exports = new ThreadWorker(
+  {
+    jsonIntegerSerialization: data => {
+      jsonIntegerSerialization(data.n)
+      return isMainThread
+    },
+    factorial: data => factorial(data.n),
+    fibonacci: data => fibonacci(data.n)
+  },
+  {
+    maxInactiveTime: 500,
+    killBehavior: KillBehaviors.HARD
+  }
+)
index 177ef08bbe648a207cf74076ce6078dfac20c28a..668587dbf0bb85925640d9d90e90f991efde7cbf 100644 (file)
@@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types')
 function test (data) {
   data = data || {}
   data.function = data.function || WorkerFunctions.jsonIntegerSerialization
-  TestUtils.executeWorkerFunction(data)
-  return isMainThread
+  const result = TestUtils.executeWorkerFunction(data)
+  if (result == null) {
+    return isMainThread
+  }
+  return result
 }
 
 module.exports = new ThreadWorker(test, {