feat: support multiple functions per worker
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 4 May 2023 20:04:27 +0000 (22:04 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 4 May 2023 20:04:27 +0000 (22:04 +0200)
Close #550

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
13 files changed:
CHANGELOG.md
README.md
examples/multiFunctionExample.js
examples/multifunctionWorker.js
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
src/worker/worker-options.ts
tests/worker/abstract-worker.test.js

index 631e2d66d1acbadfa75f06348b1c4995aa4b328b..b3e71fc4c75e937ae7c1d6a751fd517efd9fbef1 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Added
+
+- Support multiple task functions per worker.
+
 ## [2.4.11] - 2023-04-23
 
 ### Changed
index 857d9c26604192619b5a696ee400a43183b46e81..7767c75c1b07c28b96ffbe54a3acbb1e0369461f 100644 (file)
--- a/README.md
+++ b/README.md
@@ -139,7 +139,7 @@ pool.execute({}).then(res => {
 
 You can do the same with the classes ClusterWorker, FixedClusterPool and DynamicClusterPool.
 
-**See examples folder for more details (in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js))**.  
+**See examples folder for more details (in particular if you want to use a pool with [multiple worker functions](./examples/multiFunctionExample.js))**.  
 **Now TypeScript is also supported, find how to use it into the example folder**.
 
 Remember that workers can only send and receive serializable data.
@@ -209,7 +209,7 @@ This method will call the terminate method on each worker.
 
 ### `class YourWorker extends ThreadWorker/ClusterWorker`
 
-`fn` (mandatory) The function that you want to execute on the worker  
+`taskFunctions` (mandatory) The task function(s) that you want to execute on the worker  
 `opts` (optional) An object with these properties:
 
 - `maxInactiveTime` (optional) - Max time to wait tasks to work on in milliseconds, after this period the new worker will die.  
index 31fd8809613f7e2b86783a53bb16e9972128743c..213736ea037899024da9e9aab20213ebf518de40 100644 (file)
@@ -5,11 +5,11 @@ const pool = new FixedThreadPool(15, './multiFunctionWorker.js', {
 })
 
 pool
-  .execute({ functionName: 'fn0', input: 'hello' })
+  .execute({ text: 'hello' }, 'fn0')
   .then(res => console.log(res))
   .catch(err => console.error(err))
 pool
-  .execute({ functionName: 'fn1', input: 'multiple functions' })
+  .execute({ text: 'multiple functions' }, 'fn1')
   .then(res => console.log(res))
   .catch(err => console.error(err))
 
index 61369cfa57171d2cc26c5028cd170e4cd51cf9c7..a38d8bb0872e82658abac89a01408d37f6983960 100644 (file)
@@ -1,14 +1,14 @@
 'use strict'
 const { ThreadWorker } = require('poolifier')
 
-function yourFunction (data) {
-  if (data.functionName === 'fn0') {
-    console.log('Executing function 0')
-    return { data: '0 your input was' + data.input }
-  } else if (data.functionName === 'fn1') {
-    console.log('Executing function 1')
-    return { data: '1 your input was' + data.input }
-  }
+function fn0 (data) {
+  console.log('Executing function 0')
+  return { data: 'fn0 your input was' + data.text }
 }
 
-module.exports = new ThreadWorker(yourFunction)
+function fn1 (data) {
+  console.log('Executing function 1')
+  return { data: 'fn1 your input was' + data.text }
+}
+
+module.exports = new ThreadWorker({ fn0, fn1 })
index 2ee3a0ccb968100e23a5deecfe2eb4e049171875..298c03343fd529b982b15958328f9a5aeb5152e2 100644 (file)
@@ -304,9 +304,10 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public async execute (data?: Data): Promise<Response> {
+  public async execute (data?: Data, name?: string): Promise<Response> {
     const [workerNodeKey, workerNode] = this.chooseWorkerNode()
     const submittedTask: Task<Data> = {
+      name,
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
       id: crypto.randomUUID()
index f6188d8e7420cda156815baebe4821029c3b34c6..992aaaf1099c7e341daf3bd1c6a937597018b5ea 100644 (file)
@@ -144,9 +144,10 @@ export interface IPool<
    * Executes the function specified in the worker constructor with the task data input parameter.
    *
    * @param data - The task input data for the specified worker function. This can only be serializable data.
+   * @param name - The name of the worker function to execute. If not specified, the default worker function will be executed.
    * @returns Promise that will be fulfilled when the task is completed.
    */
-  execute: (data?: Data) => Promise<Response>
+  execute: (data?: Data, name?: string) => Promise<Response>
   /**
    * Shutdowns every current worker in this pool.
    */
index adec0138d86e940686f1d60026616c0ce9c2c922..d2d2a56f76160d2e2618b7d92d8169919ef01cdb 100644 (file)
@@ -36,6 +36,10 @@ export type ExitHandler<Worker extends IWorker> = (
  * @internal
  */
 export interface Task<Data = unknown> {
+  /**
+   * Task name.
+   */
+  readonly name?: string
   /**
    * Task input data that will be passed to the worker.
    */
index ea90afd688d8622ee2128366c2b3e20d5302977a..5449435478a202608ad9eed8590f8a77a5ba2d1f 100644 (file)
@@ -68,6 +68,19 @@ export type WorkerAsyncFunction<Data = unknown, Response = unknown> = (
 export type WorkerFunction<Data = unknown, Response = unknown> =
   | WorkerSyncFunction<Data, Response>
   | WorkerAsyncFunction<Data, Response>
+/**
+ * Worker functions object that can be executed.
+ * This object can contain synchronous or asynchronous functions.
+ * The key is the name of the function.
+ * The value is the function itself.
+ *
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export type TaskFunctions<Data = unknown, Response = unknown> = Record<
+string,
+WorkerFunction<Data, Response>
+>
 
 /**
  * An object holding the execution response promise resolve/reject callbacks.
index fbf99ab8f1dfffe8936b4f25140c3d9597772d3b..788c5dfa4aabcc2c5343567dabc8be14fba5e4a2 100644 (file)
@@ -1,11 +1,12 @@
 import { AsyncResource } from 'node:async_hooks'
 import type { Worker } from 'node:cluster'
 import type { MessagePort } from 'node:worker_threads'
-import type {
-  MessageValue,
-  WorkerAsyncFunction,
-  WorkerFunction,
-  WorkerSyncFunction
+import {
+  type MessageValue,
+  type TaskFunctions,
+  type WorkerAsyncFunction,
+  type WorkerFunction,
+  type WorkerSyncFunction
 } from '../utility-types'
 import { EMPTY_FUNCTION } from '../utils'
 import type { KillBehavior, WorkerOptions } from './worker-options'
@@ -26,6 +27,10 @@ export abstract class AbstractWorker<
   Data = unknown,
   Response = unknown
 > extends AsyncResource {
+  /**
+   * Task function(s) processed by the worker when the pool's `execution` function is invoked.
+   */
+  protected taskFunctions!: Map<string, WorkerFunction<Data, Response>>
   /**
    * Timestamp of the last task processed by this worker.
    */
@@ -39,14 +44,16 @@ export abstract class AbstractWorker<
    *
    * @param type - The type of async event.
    * @param isMain - Whether this is the main worker or not.
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
    * @param mainWorker - Reference to main worker.
    * @param opts - Options for the worker.
    */
   public constructor (
     type: string,
     protected readonly isMain: boolean,
-    fn: WorkerFunction<Data, Response>,
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>,
     protected mainWorker: MainWorker | undefined | null,
     protected readonly opts: WorkerOptions = {
       /**
@@ -62,7 +69,7 @@ export abstract class AbstractWorker<
   ) {
     super(type)
     this.checkWorkerOptions(this.opts)
-    this.checkFunctionInput(fn)
+    this.checkTaskFunctions(taskFunctions)
     if (!this.isMain) {
       this.lastTaskTimestamp = performance.now()
       this.aliveInterval = setInterval(
@@ -75,7 +82,7 @@ export abstract class AbstractWorker<
     this.mainWorker?.on(
       'message',
       (message: MessageValue<Data, MainWorker>) => {
-        this.messageListener(message, fn)
+        this.messageListener(message)
       }
     )
   }
@@ -88,19 +95,41 @@ export abstract class AbstractWorker<
   }
 
   /**
-   * Checks if the `fn` parameter is passed to the constructor.
+   * Checks if the `taskFunctions` parameter is passed to the constructor.
    *
-   * @param fn - The function that should be defined.
+   * @param taskFunctions - The task function(s) that should be defined.
    */
-  private checkFunctionInput (fn: WorkerFunction<Data, Response>): void {
-    if (fn == null) throw new Error('fn parameter is mandatory')
-    if (typeof fn !== 'function') {
-      throw new TypeError('fn parameter is not a function')
+  private checkTaskFunctions (
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>
+  ): void {
+    if (taskFunctions == null) { throw new Error('taskFunctions parameter is mandatory') }
+    if (
+      typeof taskFunctions !== 'function' &&
+      typeof taskFunctions !== 'object'
+    ) {
+      throw new Error('taskFunctions parameter is not a function or an object')
     }
-    if (fn.constructor.name === 'AsyncFunction' && this.opts.async === false) {
-      throw new Error(
-        'fn parameter is an async function, please set the async option to true'
-      )
+    if (
+      typeof taskFunctions === 'object' &&
+      taskFunctions.constructor !== Object &&
+      Object.prototype.toString.call(taskFunctions) !== '[object Object]'
+    ) {
+      throw new Error('taskFunctions parameter is not an object literal')
+    }
+    this.taskFunctions = new Map<string, WorkerFunction<Data, Response>>()
+    if (typeof taskFunctions !== 'function') {
+      for (const [name, fn] of Object.entries(taskFunctions)) {
+        if (typeof fn !== 'function') {
+          throw new Error(
+            'A taskFunctions parameter object value is not a function'
+          )
+        }
+        this.taskFunctions.set(name, fn.bind(this))
+      }
+    } else {
+      this.taskFunctions.set('default', taskFunctions.bind(this))
     }
   }
 
@@ -108,15 +137,17 @@ export abstract class AbstractWorker<
    * Worker message listener.
    *
    * @param message - Message received.
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
    */
-  protected messageListener (
-    message: MessageValue<Data, MainWorker>,
-    fn: WorkerFunction<Data, Response>
-  ): void {
+  protected messageListener (message: MessageValue<Data, MainWorker>): void {
     if (message.id != null && message.data != null) {
+      let fn: WorkerFunction<Data, Response> | undefined
+      if (message.name == null) {
+        fn = this.taskFunctions.get('default')
+      } else {
+        fn = this.taskFunctions.get(message.name)
+      }
       // Task message received
-      if (this.opts.async === true) {
+      if (fn?.constructor.name === 'AsyncFunction') {
         this.runInAsyncScope(this.runAsync.bind(this), this, fn, message)
       } else {
         this.runInAsyncScope(this.run.bind(this), this, fn, message)
index 8725dc89397c08d4b1e792333fd9fe08cd75ef62..49b0f984f3ca7a42721a79452da29eab34f09f22 100644 (file)
@@ -1,6 +1,10 @@
 import type { Worker } from 'node:cluster'
 import cluster from 'node:cluster'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+  MessageValue,
+  TaskFunctions,
+  WorkerFunction
+} from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
 
@@ -25,17 +29,19 @@ export class ClusterWorker<
   /**
    * Constructs a new poolifier cluster worker.
    *
-   * @param fn - Function processed by the worker when the pool's `execution` function is invoked.
+   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked.
    * @param opts - Options for the worker.
    */
   public constructor (
-    fn: WorkerFunction<Data, Response>,
+    taskFunctions:
+    | WorkerFunction<Data, Response>
+    | TaskFunctions<Data, Response>,
     opts: WorkerOptions = {}
   ) {
     super(
       'worker-cluster-pool:poolifier',
       cluster.isPrimary,
-      fn,
+      taskFunctions,
       cluster.worker,
       opts
     )
index 03566a4c510265ddbd05f4ba24c1dbaef3c03195..ce17c59a19c0644f3b2eb188e712e29447c0c773 100644 (file)
@@ -1,6 +1,10 @@
 import type { MessagePort } from 'node:worker_threads'
 import { isMainThread, parentPort } from 'node:worker_threads'
-import type { MessageValue, WorkerFunction } from '../utility-types'
+import type {
+  MessageValue,
+  TaskFunctions,
+  WorkerFunction
+} from '../utility-types'
 import { AbstractWorker } from './abstract-worker'
 import type { WorkerOptions } from './worker-options'
 
@@ -29,7 +33,7 @@ export class ThreadWorker<
    * @param opts - Options for the worker.
    */
   public constructor (
-    fn: WorkerFunction<Data, Response>,
+    fn: WorkerFunction<Data, Response> | TaskFunctions<Data, Response>,
     opts: WorkerOptions = {}
   ) {
     super('worker-thread-pool:poolifier', isMainThread, fn, parentPort, opts)
index 9ef50adea4f478c78344e8c9a0439cf2547ac228..791f915d580a32c822d91148db7d91f7dbf82912 100644 (file)
@@ -53,6 +53,7 @@ export interface WorkerOptions {
    * Whether your worker will perform asynchronous or not.
    *
    * @defaultValue false
+   * @deprecated This option will be removed in the next major version.
    */
   async?: boolean
   /**
index b62ba9425e1289fb8afc6361edc32f2be2ca9531..25e097234f439dc6ec68f18903440c21e95f1c10 100644 (file)
@@ -27,26 +27,52 @@ describe('Abstract worker test suite', () => {
     expect(worker.opts.async).toBe(true)
   })
 
-  it('Verify that fn parameter is mandatory', () => {
-    expect(() => new ClusterWorker()).toThrowError('fn parameter is mandatory')
+  it('Verify that taskFunctions parameter is mandatory', () => {
+    expect(() => new ClusterWorker()).toThrowError(
+      'taskFunctions parameter is mandatory'
+    )
   })
 
-  it('Verify that fn parameter is a function', () => {
-    expect(() => new ClusterWorker({})).toThrowError(
-      new TypeError('fn parameter is not a function')
+  it('Verify that taskFunctions parameter is a function or an object', () => {
+    expect(() => new ClusterWorker(0)).toThrowError(
+      new TypeError('taskFunctions parameter is not a function or an object')
     )
     expect(() => new ClusterWorker('')).toThrowError(
-      new TypeError('fn parameter is not a function')
+      new TypeError('taskFunctions parameter is not a function or an object')
+    )
+    expect(() => new ClusterWorker(true)).toThrowError(
+      new TypeError('taskFunctions parameter is not a function or an object')
     )
   })
 
-  it('Verify that async fn parameter without async option throw error', () => {
-    const fn = async () => {
-      return new Promise()
-    }
-    expect(() => new ClusterWorker(fn)).toThrowError(
-      'fn parameter is an async function, please set the async option to true'
+  it('Verify that taskFunctions parameter is an object literal', () => {
+    expect(() => new ClusterWorker([])).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker(new Map())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker(new Set())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+    expect(() => new ClusterWorker(new WeakMap())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
     )
+    expect(() => new ClusterWorker(new WeakSet())).toThrowError(
+      new TypeError('taskFunctions parameter is not an object literal')
+    )
+  })
+
+  it('Verify that taskFunctions parameter with multiple task functions is taken', () => {
+    const fn1 = () => {
+      return 1
+    }
+    const fn2 = () => {
+      return 2
+    }
+    const worker = new ClusterWorker({ fn1, fn2 })
+    expect(typeof worker.taskFunctions.get('fn1') === 'function').toBe(true)
+    expect(typeof worker.taskFunctions.get('fn2') === 'function').toBe(true)
   })
 
   it('Verify that handleError() method is working properly', () => {