Add a pool option to register a message listener on pool workers (#487)
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 29 Aug 2021 08:52:45 +0000 (10:52 +0200)
committerGitHub <noreply@github.com>
Sun, 29 Aug 2021 08:52:45 +0000 (10:52 +0200)
* Add a pool option to register a message listener on pool workers

It's a requirement to build bi-directionnal communications with the main
thread with poolifier.

Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
* Loosen type on MessageHandler type.

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/cluster/fixed.ts
src/pools/thread/dynamic.ts
src/pools/thread/fixed.ts
tests/pools/abstract/abstract-pool.test.js

index 6d75ac56d075338390eb9cc0191b5b6bcfe21850..c379d79556bc7d4fbcb28349536f11d5dd2e725c 100644 (file)
@@ -5,7 +5,11 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
-## [2.1.0] - 2021-dd-mm
+## [2.1.0] - 2021-28-08
+
+### Added
+
+- Add an optional pool option `messageHandler` to `PoolOptions<Worker>` for registering a message handler callback on each worker.
 
 ### Breaking Changes
 
index e0aa1810c4d839f39d29eee350ae010a6945ffab..f3a8c1c6c9a7bb0a730b6b3b10dcd9a766b64e5e 100644 (file)
@@ -12,6 +12,11 @@ import {
   WorkerChoiceStrategyContext
 } from './selection-strategies'
 
+/**
+ * Callback invoked if the worker has received a message.
+ */
+export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
+
 /**
  * Callback invoked if the worker raised an error.
  */
@@ -31,6 +36,13 @@ export type ExitHandler<Worker> = (this: Worker, code: number) => void
  * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
  */
 export interface IWorker {
+  /**
+   * Register a listener to the message event.
+   *
+   * @param event `'message'`.
+   * @param handler The message handler.
+   */
+  on(event: 'message', handler: MessageHandler<this>): void
   /**
    * Register a listener to the error event.
    *
@@ -65,6 +77,10 @@ export interface IWorker {
  * Options for a poolifier pool.
  */
 export interface PoolOptions<Worker> {
+  /**
+   * A function that will listen for message event on each worker.
+   */
+  messageHandler?: MessageHandler<Worker>
   /**
    * A function that will listen for error event on each worker.
    */
@@ -84,7 +100,7 @@ export interface PoolOptions<Worker> {
   /**
    * Pool events emission.
    *
-   * Default to true.
+   * @default true
    */
   enableEvents?: boolean
 }
@@ -295,7 +311,7 @@ export abstract class AbstractPool<
   protected abstract isMain (): boolean
 
   /**
-   * Increase the number of tasks that the given workers has applied.
+   * Increase the number of tasks that the given worker has applied.
    *
    * @param worker Worker whose tasks are increased.
    */
@@ -304,7 +320,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Decrease the number of tasks that the given workers has applied.
+   * Decrease the number of tasks that the given worker has applied.
    *
    * @param worker Worker whose tasks are decreased.
    */
@@ -313,7 +329,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Step the number of tasks that the given workers has applied.
+   * Step the number of tasks that the given worker has applied.
    *
    * @param worker Worker whose tasks are set.
    * @param step Worker number of tasks step.
@@ -403,6 +419,7 @@ export abstract class AbstractPool<
   protected createAndSetupWorker (): Worker {
     const worker: Worker = this.createWorker()
 
+    worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
index 33c05a9220415fb43317bea9aa172b62d20207c7..ec177da666ff3e483cfcf4bf65d1cb7a65369cd7 100644 (file)
@@ -23,7 +23,7 @@ export class DynamicClusterPool<
    * @param min Minimum number of workers which are always active.
    * @param max Maximum number of workers that can be created by this pool.
    * @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
-   * @param opts Options for this dynamic cluster pool. Default: `{}`
+   * @param [opts={}] Options for this dynamic cluster pool.
    */
   public constructor (
     min: number,
index ccffc6bba9cbb32888170cba58ca59b0fab8a8dd..95ea82b9697a12758a8118dc3ec0c308d8893427 100644 (file)
@@ -38,7 +38,7 @@ export class FixedClusterPool<
    *
    * @param numberOfWorkers Number of workers for this pool.
    * @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
-   * @param opts Options for this fixed cluster pool. Default: `{}`
+   * @param [opts={}] Options for this fixed cluster pool.
    */
   public constructor (
     numberOfWorkers: number,
index 932a0cd5ab40dcd454f90eaf8977f30b9194c2a0..361c615c78a8724421748036900021d550c9535e 100644 (file)
@@ -24,7 +24,7 @@ export class DynamicThreadPool<
    * @param min Minimum number of threads which are always active.
    * @param max Maximum number of threads that can be created by this pool.
    * @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
-   * @param opts Options for this dynamic thread pool. Default: `{}`
+   * @param [opts={}] Options for this dynamic thread pool.
    */
   public constructor (
     min: number,
index a13023409bfad3493f79f2757f9a1623749e8b20..e3f9602cd93cdfc3a3e03b3ab7860fe25cdb5135 100644 (file)
@@ -30,7 +30,7 @@ export class FixedThreadPool<
    *
    * @param numberOfThreads Number of threads for this pool.
    * @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute.
-   * @param opts Options for this fixed thread pool. Default: `{}`
+   * @param [opts={}] Options for this fixed thread pool.
    */
   public constructor (
     numberOfThreads: number,
index df36568c9d3cfbd99689661334f6e26bddf0ae2b..b6846cc4218c48b98ca7eb4d96ddb6e3d1ad957b 100644 (file)
@@ -109,13 +109,22 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.ROUND_ROBIN
     )
+    expect(pool.opts.messageHandler).toBeUndefined()
+    expect(pool.opts.errorHandler).toBeUndefined()
+    expect(pool.opts.onlineHandler).toBeUndefined()
+    expect(pool.opts.exitHandler).toBeUndefined()
     pool.destroy()
+    const testHandler = () => console.log('test handler executed')
     pool = new FixedThreadPool(
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.js',
       {
         workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED,
-        enableEvents: false
+        enableEvents: false,
+        messageHandler: testHandler,
+        errorHandler: testHandler,
+        onlineHandler: testHandler,
+        exitHandler: testHandler
       }
     )
     expect(pool.opts.enableEvents).toBe(false)
@@ -123,6 +132,10 @@ describe('Abstract pool test suite', () => {
     expect(pool.opts.workerChoiceStrategy).toBe(
       WorkerChoiceStrategies.LESS_RECENTLY_USED
     )
+    expect(pool.opts.messageHandler).toStrictEqual(testHandler)
+    expect(pool.opts.errorHandler).toStrictEqual(testHandler)
+    expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
+    expect(pool.opts.exitHandler).toStrictEqual(testHandler)
     pool.destroy()
   })