Add configuration tunable for pool worker choice strategy
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 17 May 2021 20:13:17 +0000 (22:13 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 17 May 2021 20:13:17 +0000 (22:13 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
README.md
src/charging-station/Bootstrap.ts
src/types/ConfigurationData.ts
src/types/Worker.ts
src/utils/Configuration.ts
src/worker/WorkerDynamicPool.ts
src/worker/WorkerFactory.ts
src/worker/WorkerStaticPool.ts

index 9b5d68be4137811df8732a1196c60e3e58fa837a..e377862aadb842a8e7261d163c1768fab1fc9eb6 100644 (file)
--- a/README.md
+++ b/README.md
@@ -31,6 +31,7 @@ workerProcess | workerSet/staticPool/dynamicPool | workerSet | string | worker t
 workerStartDelay | | 500 | integer | milliseconds to wait at charging station worker threads startup
 workerPoolMinSize | | 4 | integer | worker threads pool minimum number of threads
 workerPoolMaxSize | | 16 | integer | worker threads pool maximum number of threads
+workerPoolStrategy | ROUND_ROBIN/LESS_RECENTLY_USED/... | [poolifier](https://github.com/pioardi/poolifier) default: ROUND_ROBBIN | string | worker threads pool [poolifier](https://github.com/pioardi/poolifier) worker choice strategy
 chargingStationsPerWorker | | 1 | integer | number of charging stations per worker threads for the `workerSet` process type
 logConsole | true/false | false | boolean | output logs on the console 
 logFormat | | simple | string | winston log format
index bb101ca7b75bf3f79153770a0d0dce82ef51d494..d6ebd549d9b8c169507925777300626c631de92e 100644 (file)
@@ -83,7 +83,10 @@ export default class Bootstrap {
           startDelay: Configuration.getWorkerStartDelay(),
           poolMaxSize: Configuration.getWorkerPoolMaxSize(),
           poolMinSize: Configuration.getWorkerPoolMinSize(),
-          elementsPerWorker: Configuration.getChargingStationsPerWorker()
+          elementsPerWorker: Configuration.getChargingStationsPerWorker(),
+          poolOptions: {
+            workerChoiceStrategy: Configuration.getWorkerPoolStrategy()
+          }
         });
     }
     return this.workerImplementationInstance;
index 9c7c098e2395667660f43138dd6616755bf3d072..6d67a06e741207fb670fa712e716e9aa662a33d7 100644 (file)
@@ -1,3 +1,4 @@
+import type { WorkerChoiceStrategy } from 'poolifier';
 import { WorkerProcessType } from './Worker';
 
 export interface StationTemplateURL {
@@ -16,6 +17,7 @@ export default interface ConfigurationData {
   workerStartDelay?: number;
   workerPoolMinSize?: number;
   workerPoolMaxSize?: number;
+  workerPoolStrategy?: WorkerChoiceStrategy;
   chargingStationsPerWorker?: number;
   logFormat?: string;
   logLevel?: string;
index dde976aea42278d2c3e17c63ec891e9c44849f58..c1f6f0c89ae3c197d26eb1a37c6fa3edb2c1c328 100644 (file)
@@ -1,3 +1,4 @@
+import { PoolOptions } from 'poolifier';
 import { Worker } from 'worker_threads';
 
 export enum WorkerProcessType {
@@ -11,6 +12,7 @@ export interface WorkerOptions {
   poolMaxSize?: number;
   poolMinSize?: number;
   elementsPerWorker?: number;
+  poolOptions?: PoolOptions<Worker>;
 }
 
 // eslint-disable-next-line @typescript-eslint/no-empty-interface
index 4bc1853d2d8543db22b4f8c3d35996100467dcc1..1a2b714a5fae5f0282fd95f690e5320da8341844 100644 (file)
@@ -1,6 +1,7 @@
 import ConfigurationData, { StationTemplateURL } from '../types/ConfigurationData';
 
 import Constants from './Constants';
+import type { WorkerChoiceStrategy } from 'poolifier';
 import { WorkerProcessType } from '../types/Worker';
 import fs from 'fs';
 import path from 'path';
@@ -65,6 +66,10 @@ export default class Configuration {
     return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16;
   }
 
+  static getWorkerPoolStrategy(): WorkerChoiceStrategy {
+    return Configuration.getConfig().workerPoolStrategy;
+  }
+
   static getChargingStationsPerWorker(): number {
     return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'chargingStationsPerWorker') ? Configuration.getConfig().chargingStationsPerWorker : 1;
   }
index cf22c0a8e865587166ec362431474c693f41e7c8..a8f689782b37a414545799c3beacb2e8de6932ac 100644 (file)
@@ -15,10 +15,11 @@ export default class WorkerDynamicPool<T> extends WorkerAbstract {
    * @param {number} min
    * @param {number} max
    * @param {number} workerStartDelay
+   * @param {PoolOptions} opts
    */
-  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number) {
+  constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
     super(workerScript, workerStartDelay);
-    this.pool = DynamicPool.getInstance(min, max, this.workerScript);
+    this.pool = DynamicPool.getInstance(min, max, this.workerScript, opts);
   }
 
   get size(): number {
@@ -67,17 +68,14 @@ class DynamicPool extends DynamicThreadPool<WorkerData> {
     super(min, max, workerScript, opts);
   }
 
-  public static getInstance(min: number, max: number, workerScript: string): DynamicPool {
+  public static getInstance(min: number, max: number, workerScript: string, opts?: PoolOptions<Worker>): DynamicPool {
     if (!DynamicPool.instance) {
-      DynamicPool.instance = new DynamicPool(min, max, workerScript,
-        {
-          exitHandler: (code) => {
-            if (code !== 0) {
-              console.error(`Worker stopped with exit code ${code}`);
-            }
-          }
+      opts.exitHandler = opts.exitHandler ?? ((code) => {
+        if (code !== 0) {
+          console.error(`Worker stopped with exit code ${code}`);
         }
-      );
+      });
+      DynamicPool.instance = new DynamicPool(min, max, workerScript, opts);
     }
     return DynamicPool.instance;
   }
index 34369e186b06c03c683b4a0ece10c7e898e864b2..2cda0ca2420f3ad4ae49fcfa64dcc55d7b7a261d 100644 (file)
@@ -20,11 +20,11 @@ export default class WorkerFactory {
         return new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay);
       case WorkerProcessType.STATIC_POOL:
         options.poolMaxSize = options.poolMaxSize ?? 16;
-        return new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay);
+        return new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
       case WorkerProcessType.DYNAMIC_POOL:
         options.poolMinSize = options.poolMinSize ?? 4;
         options.poolMaxSize = options.poolMaxSize ?? 16;
-        return new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay);
+        return new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
       default:
         return null;
     }
index c78768e01360a788f131029025433ee8590a5f9b..076b1ee0af596989d6d8eaa3f9cb8ba54207eaac 100644 (file)
@@ -14,10 +14,11 @@ export default class WorkerStaticPool<T> extends WorkerAbstract {
    * @param {string} workerScript
    * @param {number} numberOfThreads
    * @param {number} startWorkerDelay
+   * @param {PoolOptions} opts
    */
-  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number) {
+  constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
     super(workerScript, startWorkerDelay);
-    this.pool = StaticPool.getInstance(numberOfThreads, this.workerScript);
+    this.pool = StaticPool.getInstance(numberOfThreads, this.workerScript, opts);
   }
 
   get size(): number {
@@ -65,17 +66,14 @@ class StaticPool extends FixedThreadPool<WorkerData> {
     super(numberOfThreads, workerScript, opts);
   }
 
-  public static getInstance(numberOfThreads: number, workerScript: string): StaticPool {
+  public static getInstance(numberOfThreads: number, workerScript: string, opts?: PoolOptions<Worker>): StaticPool {
     if (!StaticPool.instance) {
-      StaticPool.instance = new StaticPool(numberOfThreads, workerScript,
-        {
-          exitHandler: (code) => {
-            if (code !== 0) {
-              console.error(`Worker stopped with exit code ${code}`);
-            }
-          }
+      opts.exitHandler = opts.exitHandler ?? ((code) => {
+        if (code !== 0) {
+          console.error(`Worker stopped with exit code ${code}`);
         }
-      );
+      });
+      StaticPool.instance = new StaticPool(numberOfThreads, workerScript, opts);
     }
     return StaticPool.instance;
   }