Move worker message handler to the options argument
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 28 Aug 2021 14:12:43 +0000 (16:12 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 28 Aug 2021 14:12:43 +0000 (16:12 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
package-lock.json
package.json
src/charging-station/Bootstrap.ts
src/types/Worker.ts
src/worker/WorkerAbstract.ts
src/worker/WorkerDynamicPool.ts
src/worker/WorkerFactory.ts
src/worker/WorkerSet.ts
src/worker/WorkerStaticPool.ts

index 30024ef15d6b6527bb7dfca326ad527d628dca58..202fc838bbf305836e6f32af8355495df814a514 100644 (file)
       "dev": true
     },
     "mocha": {
-      "version": "9.1.0",
-      "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.0.tgz",
-      "integrity": "sha512-Kjg/XxYOFFUi0h/FwMOeb6RoroiZ+P1yOfya6NK7h3dNhahrJx1r2XIT3ge4ZQvJM86mdjNA+W5phqRQh7DwCg==",
+      "version": "9.1.1",
+      "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.1.tgz",
+      "integrity": "sha512-0wE74YMgOkCgBUj8VyIDwmLUjTsS13WV1Pg7l0SHea2qzZzlq7MDnfbPsHKcELBRk3+izEVkRofjmClpycudCA==",
       "dev": true,
       "requires": {
         "@ungap/promise-all-settled": "1.1.2",
index b8cb11c257bf241db6ecdb27889fd2e9b26c6d15..d965b8386b71b64e825e926b6ceda41958aca609 100644 (file)
@@ -98,7 +98,7 @@
     "eslint-plugin-jsdoc": "^36.0.8",
     "eslint-plugin-node": "^11.1.0",
     "expect": "^27.1.0",
-    "mocha": "^9.1.0",
+    "mocha": "^9.1.1",
     "mochawesome": "^6.2.2",
     "npm-check": "^5.9.2",
     "nyc": "^15.1.0",
index 6bcc8511f487fd36db4289aeb3d82dbaa46b2e34..a982cb694f04844a55a0ff213ab3a17782f13400 100644 (file)
@@ -95,11 +95,11 @@ export default class Bootstrap {
         elementsPerWorker: Configuration.getChargingStationsPerWorker(),
         poolOptions: {
           workerChoiceStrategy: Configuration.getWorkerPoolStrategy()
-        }
-      // eslint-disable-next-line @typescript-eslint/no-misused-promises
-      }, async (msg: WorkerMessage) => {
-        if (msg.id === WorkerMessageEvents.PERFORMANCE_STATISTICS) {
-          await Bootstrap.storage.storePerformanceStatistics(msg.data);
+        },
+        messageHandler: async (msg: WorkerMessage) => {
+          if (msg.id === WorkerMessageEvents.PERFORMANCE_STATISTICS) {
+            await Bootstrap.storage.storePerformanceStatistics(msg.data);
+          }
         }
       });
   }
index c4ad7052a5dd2c37dfbb496bdd30864e28333ce2..0c795b2e7015a264636158ea583128292d4e8c81 100644 (file)
@@ -13,6 +13,7 @@ export interface WorkerOptions {
   poolMinSize?: number;
   elementsPerWorker?: number;
   poolOptions?: PoolOptions<Worker>;
+  messageHandler?: (message: any) => void | Promise<void>;
 }
 
 // eslint-disable-next-line @typescript-eslint/no-empty-interface
index d5f9a58e4a00eeed20e24d3e9605776f943f2114..a18a085e5e0a37a58b3b31127976005f3fe4f9be 100644 (file)
@@ -4,7 +4,6 @@ import { WorkerData } from '../types/Worker';
 export default abstract class WorkerAbstract {
   protected readonly workerScript: string;
   protected readonly workerStartDelay: number;
-  protected readonly messageListener: (message: any) => void;
   public abstract size: number;
   public abstract maxElementsPerWorker: number | null;
 
@@ -13,13 +12,10 @@ export default abstract class WorkerAbstract {
    *
    * @param workerScript
    * @param workerStartDelay
-   * @param messageListenerCallback
    */
-  constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY,
-      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+  constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) {
     this.workerScript = workerScript;
     this.workerStartDelay = workerStartDelay;
-    this.messageListener = messageListenerCallback;
   }
 
   public abstract start(): Promise<void>;
index 0e1cd4cde877caf86cf4896fcf31271efeb4b53a..044938d432734f9762f4ffe7e4bb7244e3f74fd5 100644 (file)
@@ -17,11 +17,9 @@ export default class WorkerDynamicPool<T> extends WorkerAbstract {
    * @param max
    * @param workerStartDelay
    * @param opts
-   * @param messageListenerCallback
    */
-  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>,
-      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
-    super(workerScript, workerStartDelay, messageListenerCallback);
+  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
+    super(workerScript, workerStartDelay);
     opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
     this.pool = new DynamicThreadPool<WorkerData>(min, max, this.workerScript, opts);
   }
index fdfd0cb4a799b844b4bfd467bb7f6bc3c430e9b7..65d2f4d4050c761c4ffb69fe323be00449141a6a 100644 (file)
@@ -1,11 +1,12 @@
+import { Worker, isMainThread } from 'worker_threads';
 import { WorkerOptions, WorkerProcessType } from '../types/Worker';
 
 import Constants from '../utils/Constants';
+import { PoolOptions } from 'poolifier';
 import WorkerAbstract from './WorkerAbstract';
 import WorkerDynamicPool from './WorkerDynamicPool';
 import WorkerSet from './WorkerSet';
 import WorkerStaticPool from './WorkerStaticPool';
-import { isMainThread } from 'worker_threads';
 
 export default class WorkerFactory {
   // eslint-disable-next-line @typescript-eslint/no-empty-function
@@ -13,27 +14,28 @@ export default class WorkerFactory {
     // This is intentional
   }
 
-  public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions,
-      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }): WorkerAbstract | null {
+  public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null {
     if (!isMainThread) {
       throw new Error('Trying to get a worker implementation outside the main thread');
     }
     options = options ?? {} as WorkerOptions;
     options.startDelay = options.startDelay ?? Constants.WORKER_START_DELAY;
+    options.poolOptions = options?.poolOptions ?? {} as PoolOptions<Worker>;
+    // options?.messageHandler && options.poolOptions.messageHandler = options.messageHandler;
     let workerImplementation: WorkerAbstract = null;
     switch (workerProcessType) {
       case WorkerProcessType.WORKER_SET:
         options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
-        workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay, messageListenerCallback);
+        workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay, options);
         break;
       case WorkerProcessType.STATIC_POOL:
         options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
+        workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
         break;
       case WorkerProcessType.DYNAMIC_POOL:
         options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
         options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
-        workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
+        workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
         break;
       default:
         throw new Error(`Worker implementation type '${workerProcessType}' not found`);
index 9b9ad3324540ce37dcbb828f42420535f668acf6..6ab08b92d3fbdc8ca557476ad160e25f889b7e71 100644 (file)
@@ -1,6 +1,6 @@
 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
 
-import { WorkerMessageEvents, WorkerSetElement } from '../types/Worker';
+import { WorkerMessageEvents, WorkerOptions, WorkerSetElement } from '../types/Worker';
 
 import Utils from '../utils/Utils';
 import { Worker } from 'worker_threads';
@@ -8,7 +8,8 @@ import WorkerAbstract from './WorkerAbstract';
 import { WorkerUtils } from './WorkerUtils';
 
 export default class WorkerSet<T> extends WorkerAbstract {
-  public maxElementsPerWorker: number;
+  public readonly maxElementsPerWorker: number;
+  private readonly messageHandler: (message: any) => void | Promise<void>;
   private workerSet: Set<WorkerSetElement>;
 
   /**
@@ -17,12 +18,13 @@ export default class WorkerSet<T> extends WorkerAbstract {
    * @param workerScript
    * @param maxElementsPerWorker
    * @param workerStartDelay
-   * @param messageListenerCallback
+   * @param opts
    */
-  constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
-    super(workerScript, workerStartDelay, messageListenerCallback);
-    this.workerSet = new Set<WorkerSetElement>();
+  constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, opts?: WorkerOptions) {
+    super(workerScript, workerStartDelay);
     this.maxElementsPerWorker = maxElementsPerWorker;
+    this.messageHandler = opts?.messageHandler ?? (() => { });
+    this.workerSet = new Set<WorkerSetElement>();
   }
 
   get size(): number {
@@ -77,7 +79,7 @@ export default class WorkerSet<T> extends WorkerAbstract {
    */
   private startWorker(): void {
     const worker = new Worker(this.workerScript);
-    worker.on('message', this.messageListener);
+    worker.on('message', this.messageHandler);
     worker.on('error', () => { /* This is intentional */ });
     worker.on('exit', (code) => {
       WorkerUtils.defaultExitHandler(code);
index d5e9222618fb1ae37cdb1cfcf563ca4936a99594..e4a8910ba8dc103b8170a21b6a15620e648a435d 100644 (file)
@@ -16,11 +16,9 @@ export default class WorkerStaticPool<T> extends WorkerAbstract {
    * @param numberOfThreads
    * @param startWorkerDelay
    * @param opts
-   * @param messageListenerCallback
    */
-  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>,
-      messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
-    super(workerScript, startWorkerDelay, messageListenerCallback);
+  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
+    super(workerScript, startWorkerDelay);
     opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
     this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts);
   }