Merge pull request #747 from poolifier/multiple-functions
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 5 May 2023 14:44:07 +0000 (16:44 +0200)
committerGitHub <noreply@github.com>
Fri, 5 May 2023 14:44:07 +0000 (16:44 +0200)
23 files changed:
CHANGELOG.md
README.md
examples/multiFunctionExample.js
examples/multifunctionWorker.js
src/index.ts
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
src/worker/worker-options.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js
tests/worker-files/cluster/testMultiTasksWorker.js [new file with mode: 0644]
tests/worker-files/cluster/testWorker.js
tests/worker-files/thread/testMultiTasksWorker.js [new file with mode: 0644]
tests/worker-files/thread/testWorker.js
tests/worker/abstract-worker.test.js

index 3d11d6f20be71c394f5cdce00e6dbfa01e6cf8f9..b3ebe01c436563bfe2def02082c02ffa48ae9840 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Support multiple task functions per worker.
+
 ### Changed
 
 - Use O(1) queue implementation for tasks queueing.
index 857d9c26604192619b5a696ee400a43183b46e81..7767c75c1b07c28b96ffbe54a3acbb1e0369461f 100644 (file)
--- a/README.md
+++ b/README.md
@@ -139,7 +139,7 @@ pool.execute({}).then(res => {
 
 You can do the same with the classes ClusterWorker, FixedClusterPool and DynamicClusterPool.
 
-**See examples folder for more details (in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js))**.  
+**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.  
 **Now TypeScript is also supported, find how to use it into the example folder**.
 
 Remember that workers can only send and receive serializable data.
@@ -209,7 +209,7 @@ This method will call the terminate method on each worker.
 
 ### `class YourWorker extends ThreadWorker/ClusterWorker`
 
-`fn` (mandatory) The function that you want to execute on the worker  
+`taskFunctions` (mandatory) The task function(s) that you want to execute on the worker  
 `opts` (optional) An object with these properties:
 
 - `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die.  
index 31fd8809613f7e2b86783a53bb16e9972128743c..213736ea037899024da9e9aab20213ebf518de40 100644 (file)
@@ -5,11 +5,11 @@ const pool = new FixedThreadPool(15, './multiFunctionWorker.js', {
 })
 
 pool
-  .execute({ functionName: 'fn0', input: 'hello' })
+  .execute({ text: 'hello' }, 'fn0')
   .then(res => console.log(res))
   .catch(err => console.error(err))
 pool
-  .execute({ functionName: 'fn1', input: 'multiple functions' })
+  .execute({ text: 'multiple functions' }, 'fn1')
   .then(res => console.log(res))
   .catch(err => console.error(err))
 
index 61369cfa57171d2cc26c5028cd170e4cd51cf9c7..53217fa04957821ec4def7bca47b4334e21bf4d0 100644 (file)
@@ -1,14 +1,14 @@
 'use strict'
 const { ThreadWorker } = require('poolifier')
 
-function yourFunction (data) {
-  if (data.functionName === 'fn0') {
-    console.log('Executing function 0')
-    return { data: '0 your input was' + data.input }
-  } else if (data.functionName === 'fn1') {
-    console.log('Executing function 1')
-    return { data: '1 your input was' + data.input }
-  }
+function fn0 (data) {
+  console.log('Executing function 0')
+  return { data: 'fn0 your input text was' + data.text }
 }
 
-module.exports = new ThreadWorker(yourFunction)
+function fn1 (data) {
+  console.log('Executing function 1')
+  return { data: 'fn1 your input text was' + data.text }
+}
+
+module.exports = new ThreadWorker({ fn0, fn1 })
index df5358aeba523fa37c8617eba17296c3a12d59d1..03ee28090ba4aef76bdd84d8c5072185487e7f5a 100644 (file)
@@ -6,8 +6,8 @@ export { PoolEvents } from './pools/pool'
 export type {
   IPool,
   PoolEmitter,
-  PoolOptions,
   PoolEvent,
+  PoolOptions,
   PoolType,
   TasksQueueOptions
 } from './pools/pool'
@@ -39,8 +39,9 @@ export { KillBehaviors } from './worker/worker-options'
 export type { KillBehavior, WorkerOptions } from './worker/worker-options'
 export type {
   Draft,
-  PromiseResponseWrapper,
   MessageValue,
+  PromiseResponseWrapper,
+  TaskFunctions,
   WorkerAsyncFunction,
   WorkerFunction,
   WorkerSyncFunction
index 06cdc3cd5d635c1e13b209228631bea7e6960fd6..edf2b29a6e9de328133735cea8a6c58b18609db0 100644 (file)
@@ -305,9 +305,10 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data): Promise<Response> {
+  public async execute (data?: Data, name?: string): Promise<Response> {
     const [workerNodeKey, workerNode] = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
+      name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
       id: crypto.randomUUID()
index f6188d8e7420cda156815baebe4821029c3b34c6..992aaaf1099c7e341daf3bd1c6a937597018b5ea 100644 (file)
@@ -144,9 +144,10 @@ export interface IPool<
    * Executes the function specified in the worker constructor with the task data input parameter.
    *
    * @param data - The task input data for the specified worker function. This can only be serializable data.
+   * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed.
    * @returns Promise that will be fulfilled when the task is completed.
    */
-  execute: (data?: Data) => Promise<Response>
+  execute: (data?: Data, name?: string) => Promise<Response>
   /**
    * Shutdowns every current worker in this pool.
    */
index 716376faf54ac6925cf4d4ff2ea4804e1d2cb38e..2db5f630fefa5a1e81161ca75362e7c73fc450a1 100644 (file)
@@ -37,12 +37,16 @@ export type ExitHandler<Worker extends IWorker> = (
  * @internal
  */
 export interface Task<Data = unknown> {
+  /**
+   * Task name.
+   */
+  readonly name?: string
   /**
    * Task input data that will be passed to the worker.
    */
   readonly data?: Data
   /**
-   * UUID of the message.
+   * Message UUID.
    */
   readonly id?: string
 }
index ea90afd688d8622ee2128366c2b3e20d5302977a..331085febb79486a590ed3aff0020eb11b08f02d 100644 (file)
@@ -48,6 +48,7 @@ export interface MessageValue<
 export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
   data?: Data
 ) => Response
+
 /**
  * Worker asynchronous function that can be executed.
  * This function must return a promise.
@@ -58,6 +59,7 @@ export type WorkerSyncFunction<Data = unknown, Response = unknown> = (
 export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
   data?: Data
 ) => Promise<Response>
+
 /**
  * Worker function that can be executed.
  * This function can be synchronous or asynchronous.
@@ -69,6 +71,20 @@ export type WorkerFunction<Data = unknown, Response = unknown> =
   | WorkerSyncFunction<Data, Response>
   | WorkerAsyncFunction<Data, Response>
 
+/**
+ * Worker functions that can be executed.
+ * This object can contain synchronous or asynchronous functions.
+ * The key is the name of the function.
+ * The value is the function itself.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type TaskFunctions<Data = unknown, Response = unknown> = Record<
+string,
+WorkerFunction<Data, Response>
+>
+
 /**
  * An object holding the execution response promise resolve/reject callbacks.
  *
index fbf99ab8f1dfffe8936b4f25140c3d9597772d3b..8b461f7d3bad7d462c0f4bcea900f0a3bff295dd 100644 (file)
@@ -3,6 +3,7 @@ import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
 import type {
   MessageValue,
+  TaskFunctions,
   WorkerAsyncFunction,
   WorkerFunction,
   WorkerSyncFunction
@@ -11,6 +12,7 @@ import { EMPTY_FUNCTION } from '../utils'
 import type { KillBehavior, WorkerOptions } from './worker-options'
 import { KillBehaviors } from './worker-options'
 
+const DEFAULT_FUNCTION_NAME = 'default'
 const DEFAULT_MAX_INACTIVE_TIME = 60000
 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
 
@@ -26,6 +28,10 @@ export abstract class AbstractWorker<
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
+  /**
+   * Task function(s) processed by the worker when the pool's `execution` function is invoked.
+   */
+  protected taskFunctions!: Map<string, WorkerFunction<Data, Response>>
   /**
    * Timestamp of the last task processed by this worker.
    */
@@ -39,14 +45,16 @@ export abstract class AbstractWorker<
    *
    * @param type - The type of async event.
    * @param isMain - Whether this is the main worker or not.
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
    * @param mainWorker - Reference to main worker.
    * @param opts - Options for the worker.
    */
   public constructor (
     type: string,
     protected readonly isMain: boolean,
-    fn: WorkerFunction<Data, Response>,
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>,
     protected mainWorker: MainWorker | undefined | null,
     protected readonly opts: WorkerOptions = {
       /**
@@ -62,7 +70,7 @@ export abstract class AbstractWorker<
   ) {
     super(type)
     this.checkWorkerOptions(this.opts)
-    this.checkFunctionInput(fn)
+    this.checkTaskFunctions(taskFunctions)
     if (!this.isMain) {
       this.lastTaskTimestamp = performance.now()
       this.aliveInterval = setInterval(
@@ -72,12 +80,7 @@ export abstract class AbstractWorker<
       this.checkAlive.bind(this)()
     }
 
-    this.mainWorker?.on(
-      'message',
-      (message: MessageValue<Data, MainWorker>) => {
-        this.messageListener(message, fn)
-      }
-    )
+    this.mainWorker?.on('message', this.messageListener.bind(this))
   }
 
   private checkWorkerOptions (opts: WorkerOptions): void {
@@ -88,19 +91,51 @@ export abstract class AbstractWorker<
   }
 
   /**
-   * Checks if the `fn` parameter is passed to the constructor.
+   * Checks if the `taskFunctions` parameter is passed to the constructor.
    *
-   * @param fn - The function that should be defined.
+   * @param taskFunctions - The task function(s) parameter that should be checked.
    */
-  private checkFunctionInput (fn: WorkerFunction<Data, Response>): void {
-    if (fn == null) throw new Error('fn parameter is mandatory')
-    if (typeof fn !== 'function') {
-      throw new TypeError('fn parameter is not a function')
+  private checkTaskFunctions (
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>
+  ): void {
+    if (taskFunctions == null) {
+      throw new Error('taskFunctions parameter is mandatory')
     }
-    if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
-      throw new Error(
-        'fn parameter is an async function, please set the async option to true'
-      )
+    if (
+      typeof taskFunctions !== 'function' &&
+      typeof taskFunctions !== 'object'
+    ) {
+      throw new Error('taskFunctions parameter is not a function or an object')
+    }
+    if (
+      typeof taskFunctions === 'object' &&
+      taskFunctions.constructor !== Object &&
+      Object.prototype.toString.call(taskFunctions) !== '[object Object]'
+    ) {
+      throw new Error('taskFunctions parameter is not an object literal')
+    }
+    this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
+    if (typeof taskFunctions !== 'function') {
+      let firstEntry = true
+      for (const [name, fn] of Object.entries(taskFunctions)) {
+        if (typeof fn !== 'function') {
+          throw new Error(
+            'A taskFunctions parameter object value is not a function'
+          )
+        }
+        this.taskFunctions.set(name, fn.bind(this))
+        if (firstEntry) {
+          this.taskFunctions.set(DEFAULT_FUNCTION_NAME, fn.bind(this))
+          firstEntry = false
+        }
+      }
+      if (firstEntry) {
+        throw new Error('taskFunctions parameter object is empty')
+      }
+    } else {
+      this.taskFunctions.set(DEFAULT_FUNCTION_NAME, taskFunctions.bind(this))
     }
   }
 
@@ -108,18 +143,15 @@ export abstract class AbstractWorker<
    * Worker message listener.
    *
    * @param message - Message received.
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
    */
-  protected messageListener (
-    message: MessageValue<Data, MainWorker>,
-    fn: WorkerFunction<Data, Response>
-  ): void {
+  protected messageListener (message: MessageValue<Data, MainWorker>): void {
     if (message.id != null && message.data != null) {
       // Task message received
-      if (this.opts.async === true) {
+      const fn = this.getTaskFunction(message.name)
+      if (fn?.constructor.name === 'AsyncFunction') {
         this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
       } else {
-        this.runInAsyncScope(this.run.bind(this), this, fn, message)
+        this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
       }
     } else if (message.parent != null) {
       // Main worker reference message received
@@ -178,7 +210,7 @@ export abstract class AbstractWorker<
    * @param fn - Function that will be executed.
    * @param message - Input data for the given function.
    */
-  protected run (
+  protected runSync (
     fn: WorkerSyncFunction<Data, Response>,
     message: MessageValue<Data>
   ): void {
@@ -229,4 +261,18 @@ export abstract class AbstractWorker<
       })
       .catch(EMPTY_FUNCTION)
   }
+
+  /**
+   * Gets the task function in the given scope.
+   *
+   * @param name - Name of the function that will be returned.
+   */
+  private getTaskFunction (name?: string): WorkerFunction<Data, Response> {
+    name = name ?? DEFAULT_FUNCTION_NAME
+    const fn = this.taskFunctions.get(name)
+    if (fn == null) {
+      throw new Error(`Task function "${name}" not found`)
+    }
+    return fn
+  }
 }
index 8725dc89397c08d4b1e792333fd9fe08cd75ef62..49b0f984f3ca7a42721a79452da29eab34f09f22 100644 (file)
@@ -1,6 +1,10 @@
 import type { Worker } from 'node:cluster'
 import cluster from 'node:cluster'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+  MessageValue,
+  TaskFunctions,
+  WorkerFunction
+} from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
 
@@ -25,17 +29,19 @@ export class ClusterWorker<
   /**
    * Constructs a new poolifier cluster worker.
    *
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
    * @param opts - Options for the worker.
    */
   public constructor (
-    fn: WorkerFunction<Data, Response>,
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>,
     opts: WorkerOptions = {}
   ) {
     super(
       'worker-cluster-pool:poolifier',
       cluster.isPrimary,
-      fn,
+      taskFunctions,
       cluster.worker,
       opts
     )
index 03566a4c510265ddbd05f4ba24c1dbaef3c03195..75cd9da505b709a954f188ab209145221c90bc84 100644 (file)
@@ -1,6 +1,10 @@
 import type { MessagePort } from 'node:worker_threads'
 import { isMainThread, parentPort } from 'node:worker_threads'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+  MessageValue,
+  TaskFunctions,
+  WorkerFunction
+} from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
 
@@ -25,14 +29,22 @@ export class ThreadWorker<
   /**
    * Constructs a new poolifier thread worker.
    *
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
    * @param opts - Options for the worker.
    */
   public constructor (
-    fn: WorkerFunction<Data, Response>,
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>,
     opts: WorkerOptions = {}
   ) {
-    super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
+    super(
+      'worker-thread-pool:poolifier',
+      isMainThread,
+      taskFunctions,
+      parentPort,
+      opts
+    )
   }
 
   /** @inheritDoc */
index 9ef50adea4f478c78344e8c9a0439cf2547ac228..791f915d580a32c822d91148db7d91f7dbf82912 100644 (file)
@@ -53,6 +53,7 @@ export interface WorkerOptions {
    * Whether your worker will perform asynchronous or not.
    *
    * @defaultValue false
+   * @deprecated This option will be removed in the next major version.
    */
   async?: boolean
   /**
index 366d4965960f6eddfb17856331fab8542b44fce3..81ae3649f1fd70f71e103a818ee51a172870bd7d 100644 (file)
@@ -1,5 +1,6 @@
 const { expect } = require('expect')
 const {
+  DynamicClusterPool,
   DynamicThreadPool,
   FixedClusterPool,
   FixedThreadPool,
@@ -398,4 +399,21 @@ describe('Abstract pool test suite', () => {
     expect(poolBusy).toBe(numberOfWorkers + 1)
     await pool.destroy()
   })
+
+  it('Verify that multiple tasks worker is working', async () => {
+    const pool = new DynamicClusterPool(
+      numberOfWorkers,
+      numberOfWorkers * 2,
+      './tests/worker-files/cluster/testMultiTasksWorker.js'
+    )
+    const data = { n: 10 }
+    const result0 = await pool.execute(data)
+    expect(result0).toBe(false)
+    const result1 = await pool.execute(data, 'jsonIntegerSerialization')
+    expect(result1).toBe(false)
+    const result2 = await pool.execute(data, 'factorial')
+    expect(result2).toBe(3628800)
+    const result3 = await pool.execute(data, 'fibonacci')
+    expect(result3).toBe(89)
+  })
 })
index e67d92bdb9849d05c09801a865e548a8208a4470..6d2bccadc1f2ae9bce01904443bb475b56ccb6a6 100644 (file)
@@ -19,11 +19,11 @@ describe('Dynamic cluster pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
index 59f76802bb97a109795e92702e470c6a670543e0..34c066100aff925579f80a8876c03a4b4d81e695 100644 (file)
@@ -65,11 +65,11 @@ describe('Fixed cluster pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that is possible to invoke the execute() method without input', async () => {
index 3e5a2d354fe8d76c5d0c73013a8a2957d5ff7d52..959693901b22a6f9bda1d8d7be607cc8a40e82cc 100644 (file)
@@ -19,11 +19,11 @@ describe('Dynamic thread pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
index 9187b817d1bf749ae956d91a8afcb9436a48b3ab..09c53ea49a6afd86338679684d16e904fb2d4f75 100644 (file)
@@ -65,11 +65,11 @@ describe('Fixed thread pool test suite', () => {
     let result = await pool.execute({
       function: WorkerFunctions.fibonacci
     })
-    expect(result).toBe(false)
+    expect(result).toBe(121393)
     result = await pool.execute({
       function: WorkerFunctions.factorial
     })
-    expect(result).toBe(false)
+    expect(result).toBe(9.33262154439441e157)
   })
 
   it('Verify that is possible to invoke the execute() method without input', async () => {
diff --git a/tests/worker-files/cluster/testMultiTasksWorker.js b/tests/worker-files/cluster/testMultiTasksWorker.js
new file mode 100644 (file)
index 0000000..692a2e7
--- /dev/null
@@ -0,0 +1,23 @@
+'use strict'
+const { isMaster } = require('cluster')
+const { ClusterWorker, KillBehaviors } = require('../../../lib')
+const {
+  jsonIntegerSerialization,
+  factorial,
+  fibonacci
+} = require('../../test-utils')
+
+module.exports = new ClusterWorker(
+  {
+    jsonIntegerSerialization: data => {
+      jsonIntegerSerialization(data.n)
+      return isMaster
+    },
+    factorial: data => factorial(data.n),
+    fibonacci: data => fibonacci(data.n)
+  },
+  {
+    maxInactiveTime: 500,
+    killBehavior: KillBehaviors.HARD
+  }
+)
index 3f50d102a8364a1dad1ed6d67317f13435dd5942..0d0611d00833523fc984bec11849ddfdf61e8d86 100644 (file)
@@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types')
 function test (data) {
   data = data || {}
   data.function = data.function || WorkerFunctions.jsonIntegerSerialization
-  TestUtils.executeWorkerFunction(data)
-  return isMaster
+  const result = TestUtils.executeWorkerFunction(data)
+  if (result == null) {
+    return isMaster
+  }
+  return result
 }
 
 module.exports = new ClusterWorker(test, {
diff --git a/tests/worker-files/thread/testMultiTasksWorker.js b/tests/worker-files/thread/testMultiTasksWorker.js
new file mode 100644 (file)
index 0000000..da357a2
--- /dev/null
@@ -0,0 +1,23 @@
+'use strict'
+const { isMainThread } = require('worker_threads')
+const { ThreadWorker, KillBehaviors } = require('../../../lib')
+const {
+  jsonIntegerSerialization,
+  factorial,
+  fibonacci
+} = require('../../test-utils')
+
+module.exports = new ThreadWorker(
+  {
+    jsonIntegerSerialization: data => {
+      jsonIntegerSerialization(data.n)
+      return isMainThread
+    },
+    factorial: data => factorial(data.n),
+    fibonacci: data => fibonacci(data.n)
+  },
+  {
+    maxInactiveTime: 500,
+    killBehavior: KillBehaviors.HARD
+  }
+)
index 177ef08bbe648a207cf74076ce6078dfac20c28a..668587dbf0bb85925640d9d90e90f991efde7cbf 100644 (file)
@@ -7,8 +7,11 @@ const { WorkerFunctions } = require('../../test-types')
 function test (data) {
   data = data || {}
   data.function = data.function || WorkerFunctions.jsonIntegerSerialization
-  TestUtils.executeWorkerFunction(data)
-  return isMainThread
+  const result = TestUtils.executeWorkerFunction(data)
+  if (result == null) {
+    return isMainThread
+  }
+  return result
 }
 
 module.exports = new ThreadWorker(test, {
index b62ba9425e1289fb8afc6361edc32f2be2ca9531..7f05cbb70d91d5a68f76f7bc7508fdac82acd5ea 100644 (file)
@@ -27,26 +27,56 @@ describe('Abstract worker test suite', () => {
     expect(worker.opts.async).toBe(true)
   })
 
-  it('Verify that fn parameter is mandatory', () => {
-    expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory')
+  it('Verify that taskFunctions parameter is mandatory', () => {
+    expect(() => new ClusterWorker()).toThrowError(
+      'taskFunctions parameter is mandatory'
+    )
   })
 
-  it('Verify that fn parameter is a function', () => {
-    expect(() => new ClusterWorker({})).toThrowError(
-      new TypeError('fn parameter is not a function')
+  it('Verify that taskFunctions parameter is a function or an object', () => {
+    expect(() => new ClusterWorker(0)).toThrowError(
+      new TypeError('taskFunctions parameter is not a function or an object')
     )
     expect(() => new ClusterWorker('')).toThrowError(
-      new TypeError('fn parameter is not a function')
+      new TypeError('taskFunctions parameter is not a function or an object')
+    )
+    expect(() => new ClusterWorker(true)).toThrowError(
+      new TypeError('taskFunctions parameter is not a function or an object')
     )
   })
 
-  it('Verify that async fn parameter without async option throw error', () => {
-    const fn = async () => {
-      return new Promise()
-    }
-    expect(() => new ClusterWorker(fn)).toThrowError(
-      'fn parameter is an async function, please set the async option to true'
+  it('Verify that taskFunctions parameter is not an empty object literal', () => {
+    expect(() => new ClusterWorker([])).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker(new Map())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker(new Set())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker(new WeakMap())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
     )
+    expect(() => new ClusterWorker(new WeakSet())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker({})).toThrowError(
+      new TypeError('taskFunctions parameter object is empty')
+    )
+  })
+
+  it('Verify that taskFunctions parameter with multiple task functions is taken', () => {
+    const fn1 = () => {
+      return 1
+    }
+    const fn2 = () => {
+      return 2
+    }
+    const worker = new ClusterWorker({ fn1, fn2 })
+    expect(typeof worker.taskFunctions.get('default') === 'function').toBe(true)
+    expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true)
+    expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true)
   })
 
   it('Verify that handleError() method is working properly', () => {