Switch to poolifier worker threads pool implementation.
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 25 Jan 2021 20:30:29 +0000 (21:30 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 25 Jan 2021 20:30:29 +0000 (21:30 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
17 files changed:
docker/config.json
package-lock.json
package.json
src/assets/config-template.json
src/charging-station/ChargingStation.ts
src/charging-station/StationWorker.ts
src/start.ts
src/types/ConfigurationData.ts
src/types/Worker.ts
src/utils/Configuration.ts
src/utils/Constants.ts
src/utils/Utils.ts
src/worker/WorkerDynamicPool.ts [new file with mode: 0644]
src/worker/WorkerFactory.ts
src/worker/WorkerPool.ts [deleted file]
src/worker/WorkerSet.ts
src/worker/WorkerStaticPool.ts [new file with mode: 0644]

index 84148c20d9438eb34e9d65522ac8b216869df7ca..54a9f31d98decea8000bc2cd98dabe9e6b4e87cf 100644 (file)
@@ -4,8 +4,9 @@
   ],
   "distributeStationsToTenantsEqually": true,
   "statisticsDisplayInterval": 60,
-  "useWorkerPool": false,
-  "workerMaxPoolSize": 16,
+  "workerProcess": "workerSet",
+  "workerPoolMinSize": 4,
+  "workerPoolMaxSize": 16,
   "chargingStationsPerWorker": 1,
   "stationTemplateURLs": [
     {
index 6d0fc1e5ad0ba428c4bd6048c3f65d79afea4137..db9117b0b3daf97a8c429428be5f741173eeebce 100644 (file)
       "integrity": "sha512-PACt1xdErJbMUOUweSrbVM7gSIYm1vTncW2hF6Os/EeWi6TXYAYMPp+8v6rzHmypE5gHrxaxZNXgMkJVIdZpHw==",
       "dev": true
     },
-    "@types/worker-threads-pool": {
-      "version": "2.0.0",
-      "resolved": "https://registry.npmjs.org/@types/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz",
-      "integrity": "sha512-VnFtC73JBhQRtamCR4edJWxyZrwvwz6y/Ov4VNQLiirpRUkND7xo0iUNXEZP2mvZuNbvFeXGglTAFZwCDdWMTg==",
-      "dev": true,
-      "requires": {
-        "@types/node": "*"
-      }
-    },
     "@types/ws": {
       "version": "7.4.0",
       "resolved": "https://registry.npmjs.org/@types/ws/-/ws-7.4.0.tgz",
       "integrity": "sha512-OPdCF6GsMIP+Az+aWfAAOEt2/+iVDKE7oy6lJ098aoe59oAmK76qV6Gw60SbZ8jHuG2wH058GF4pLFbYamYrVA==",
       "dev": true
     },
-    "after-all": {
-      "version": "2.0.2",
-      "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz",
-      "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=",
-      "requires": {
-        "once": "^1.3.0"
-      }
-    },
     "ajv": {
       "version": "6.12.2",
       "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.2.tgz",
       "version": "1.4.0",
       "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz",
       "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
+      "dev": true,
       "requires": {
         "wrappy": "1"
       }
         "semver-compare": "^1.0.0"
       }
     },
+    "poolifier": {
+      "version": "1.2.1",
+      "resolved": "https://registry.npmjs.org/poolifier/-/poolifier-1.2.1.tgz",
+      "integrity": "sha512-kUH3JlLLO7JdAnRdtbgaSME5WDxgDzAuUk9+hapVHfXeI0VjpeuLnxLL8cUF7lEgrUE4m59scr5TFx5ajbPqXQ=="
+    },
     "posix-character-classes": {
       "version": "0.1.1",
       "resolved": "https://registry.npmjs.org/posix-character-classes/-/posix-character-classes-0.1.1.tgz",
       "integrity": "sha512-Hz/mrNwitNRh/HUAtM/VT/5VH+ygD6DV7mYKZAtHOrbs8U7lvPS6xf7EJKMF0uW1KJCl0H701g3ZGus+muE5vQ==",
       "dev": true
     },
-    "worker-threads-pool": {
-      "version": "2.0.0",
-      "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz",
-      "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==",
-      "requires": {
-        "after-all": "^2.0.2"
-      }
-    },
     "wrap-ansi": {
       "version": "5.1.0",
       "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-5.1.0.tgz",
     "wrappy": {
       "version": "1.0.2",
       "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
-      "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
+      "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=",
+      "dev": true
     },
     "write-file-atomic": {
       "version": "2.4.2",
index 36cea67b28334f159b0b5bae85e2818f86b4800d..1e39e3218154dce1dd0a2a662f646da1d490d8ef 100644 (file)
   },
   "dependencies": {
     "mongodb": "^3.6.3",
+    "poolifier": "^1.2.1",
     "source-map-support": "^0.5.19",
     "tslib": "^2.1.0",
     "uuid": "^8.3.2",
     "winston": "^3.3.3",
     "winston-daily-rotate-file": "^4.5.0",
-    "worker-threads-pool": "^2.0.0",
     "ws": "^7.4.2"
   },
   "optionalDependencies": {
@@ -63,7 +63,6 @@
   "devDependencies": {
     "@types/node": "^14.14.22",
     "@types/uuid": "^8.3.0",
-    "@types/worker-threads-pool": "^2.0.0",
     "@types/ws": "^7.4.0",
     "@typescript-eslint/eslint-plugin": "^4.14.0",
     "@typescript-eslint/parser": "^4.14.0",
index 9e52c6cf869d6ef96a4f10d6d872afc3298d1d9e..a7eef490e1611d0b6eabc4230ff60aa454632bc8 100644 (file)
@@ -5,7 +5,8 @@
   "distributeStationsToTenantsEqually": true,
   "statisticsDisplayInterval": 60,
   "chargingStationsPerWorker": 1,
-  "useWorkerPool": false,
+  "workerProcess": "workerSet",
+  "workerPoolMinSize": 4,
   "workerPoolMaxSize": 16,
   "stationTemplateURLs": [
     {
index 2456defb31f9556efefdf9ad21326d76f7c47553..e419d81eb38729ccf5585fc01db3238780655aa5 100644 (file)
@@ -1347,7 +1347,7 @@ export default class ChargingStation {
           await this.sendStatusNotification(transactionConnectorID, ChargePointStatus.PREPARING);
           if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose === ChargingProfilePurposeType.TX_PROFILE) {
             this._setChargingProfile(transactionConnectorID, commandPayload.chargingProfile);
-          } else {
+          } else if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose !== ChargingProfilePurposeType.TX_PROFILE) {
             return Constants.OCPP_RESPONSE_REJECTED;
           }
           // Authorization successful start transaction
@@ -1361,7 +1361,7 @@ export default class ChargingStation {
       await this.sendStatusNotification(transactionConnectorID, ChargePointStatus.PREPARING);
       if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose === ChargingProfilePurposeType.TX_PROFILE) {
         this._setChargingProfile(transactionConnectorID, commandPayload.chargingProfile);
-      } else {
+      } else if (commandPayload.chargingProfile && commandPayload.chargingProfile.chargingProfilePurpose !== ChargingProfilePurposeType.TX_PROFILE) {
         return Constants.OCPP_RESPONSE_REJECTED;
       }
       // No local authorization check required => start transaction
index b22d8932731356198cd64818b9b3b9cc77699e19..8ddffc4e0b9abbe709d9eb2c560c719c4a8054a5 100644 (file)
@@ -1,8 +1,10 @@
+import { WorkerData, WorkerEvents } from '../types/Worker';
 import { isMainThread, parentPort, workerData } from 'worker_threads';
 
 import ChargingStation from './ChargingStation';
+import Constants from '../utils/Constants';
+import { ThreadWorker } from 'poolifier';
 import Utils from '../utils/Utils';
-import { WorkerEvents } from '../types/Worker';
 
 if (!isMainThread) {
   // Add listener to start charging station from main thread
@@ -20,7 +22,9 @@ function addListener() {
   });
 }
 
-function startChargingStation(data: any) {
-  const station = new ChargingStation(data.index as number, data.templateFile as string);
+function startChargingStation(data: WorkerData) {
+  const station = new ChargingStation(data.index , data.templateFile);
   station.start();
 }
+
+export default new ThreadWorker(startChargingStation, { maxInactiveTime: Constants.WORKER_POOL_MAX_INACTIVE_TIME, async: false });
index 7f5822225cfe260d3073a12ce536f31dde881d18..c0eab94135e65270a9c193fa1393d116faf559a9 100644 (file)
@@ -1,14 +1,15 @@
 import Configuration from './utils/Configuration';
+import Utils from './utils/Utils';
 import { WorkerData } from './types/Worker';
 import WorkerFactory from './worker/WorkerFactory';
 import Wrk from './worker/Wrk';
 
 class Bootstrap {
-  static start() {
+  static async start() {
     try {
       let numStationsTotal = 0;
       const workerImplementation: Wrk = WorkerFactory.getWorkerImpl('./dist/charging-station/StationWorker.js');
-      void workerImplementation.start();
+      await workerImplementation.start();
       // Start ChargingStation object in worker thread
       if (Configuration.getStationTemplateURLs()) {
         for (const stationURL of Configuration.getStationTemplateURLs()) {
@@ -19,7 +20,7 @@ class Bootstrap {
                 index,
                 templateFile: stationURL.file
               };
-              void workerImplementation.addElement(workerData);
+              await workerImplementation.addElement(workerData);
               numStationsTotal++;
             }
           } catch (error) {
@@ -33,7 +34,7 @@ class Bootstrap {
       if (numStationsTotal === 0) {
         console.log('No charging station template enabled in configuration, exiting');
       } else {
-        console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Configuration.useWorkerPool() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`);
+        console.log(`Charging station simulator started with ${numStationsTotal.toString()} charging station(s) and ${workerImplementation.size}${Utils.workerPoolInUse() ? `/${Configuration.getWorkerPoolMaxSize().toString()}` : ''} worker(s) concurrently running (${workerImplementation.maxElementsPerWorker} charging station(s) per worker)`);
       }
     } catch (error) {
       // eslint-disable-next-line no-console
@@ -42,4 +43,8 @@ class Bootstrap {
   }
 }
 
-Bootstrap.start();
+Bootstrap.start().catch(
+  (error) => {
+    console.error(error);
+  }
+);
index 2c15a6f50479a545d92fa2a3ab9ee8fd3a6e8c62..125ecef636d693afc840fd16b35abde81308a0ec 100644 (file)
@@ -1,3 +1,5 @@
+import { WorkerProcessType } from './Worker';
+
 export interface StationTemplateURL {
   file: string;
   numberOfStations: number;
@@ -10,7 +12,8 @@ export default interface ConfigurationData {
   connectionTimeout?: number;
   autoReconnectMaxRetries?: number;
   distributeStationsToTenantsEqually?: boolean;
-  useWorkerPool?: boolean;
+  workerProcess?: WorkerProcessType;
+  workerPoolMinSize?: number;
   workerPoolMaxSize?: number;
   chargingStationsPerWorker?: number;
   logFormat?: string;
index bc93d76ea5c0f85514bca82b338605eb7b0675f2..20c4488224f620f68e415426384446fb1239a7ed 100644 (file)
@@ -1,5 +1,11 @@
 import { Worker } from 'worker_threads';
 
+export enum WorkerProcessType {
+  WORKER_SET = 'workerSet',
+  DYNAMIC_POOL = 'dynamicPool',
+  STATIC_POOL = 'staticPool'
+}
+
 // FIXME: make it more generic
 export interface WorkerData {
   index: number;
index 8770a9b9f3ec48cf5859d52b8a2383585a4b8351..e27ec1276539a2d7b32decec68ff9f3e6a87e39e 100644 (file)
@@ -1,5 +1,6 @@
 import ConfigurationData, { StationTemplateURL } from '../types/ConfigurationData';
 
+import { WorkerProcessType } from '../types/Worker';
 import fs from 'fs';
 
 export default class Configuration {
@@ -37,13 +38,18 @@ export default class Configuration {
     return Configuration.getConfig().stationTemplateURLs;
   }
 
-  static useWorkerPool(): boolean {
-    return Configuration.getConfig().useWorkerPool;
+  static getWorkerProcess(): WorkerProcessType {
+    Configuration.deprecateConfigurationKey('useWorkerPool;', 'Use \'workerProcess\' to define the type of worker process to use instead');
+    return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerProcess') ? Configuration.getConfig().workerProcess : WorkerProcessType.WORKER_SET;
+  }
+
+  static getWorkerPoolMinSize(): number {
+    return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMinSize') ? Configuration.getConfig().workerPoolMinSize : 4;
   }
 
   static getWorkerPoolMaxSize(): number {
     Configuration.deprecateConfigurationKey('workerPoolSize;', 'Use \'workerPoolMaxSize\' instead');
-    return Configuration.useWorkerPool() && Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16;
+    return Configuration.objectHasOwnProperty(Configuration.getConfig(), 'workerPoolMaxSize') ? Configuration.getConfig().workerPoolMaxSize : 16;
   }
 
   static getChargingStationsPerWorker(): number {
index c36ade65e9facfe159ba7bd3a9cc9a8d00ec81c2..35a77a69369a5220eabfbec4a90fcb7746ef8fe1 100644 (file)
@@ -38,4 +38,5 @@ export default class Constants {
   static readonly TRANSACTION_DEFAULT_TAGID = '00000000';
 
   static readonly START_WORKER_DELAY = 500;
+  static readonly WORKER_POOL_MAX_INACTIVE_TIME = 60000;
 }
index 16cb7fa3fca34cd39f803607ec6b70fb3f8dfa78..8ea05765f5c3e530329814d81bcdfd0310b800d4 100644 (file)
@@ -1,4 +1,6 @@
+import Configuration from './Configuration';
 import { WebSocketCloseEventStatusString } from '../types/WebSocket';
+import { WorkerProcessType } from '../types/Worker';
 import { v4 as uuid } from 'uuid';
 
 export default class Utils {
@@ -209,4 +211,8 @@ export default class Utils {
     }
     return '(Unknown)';
   }
+
+  static workerPoolInUse(): boolean {
+    return Configuration.getWorkerProcess() === WorkerProcessType.DYNAMIC_POOL || Configuration.getWorkerProcess() === WorkerProcessType.STATIC_POOL;
+  }
 }
diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts
new file mode 100644 (file)
index 0000000..9567768
--- /dev/null
@@ -0,0 +1,71 @@
+import { DynamicThreadPool, DynamicThreadPoolOptions } from 'poolifier';
+
+import Constants from '../utils/Constants';
+import Utils from '../utils/Utils';
+import { WorkerData } from '../types/Worker';
+import Wrk from './Wrk';
+import { threadId } from 'worker_threads';
+
+export default class WorkerDynamicPool extends Wrk {
+  private pool: DynamicPool;
+
+  /**
+   * Create a new `WorkerDynamicPool`.
+   *
+   * @param {string} workerScript
+   */
+  constructor(workerScript: string, min: number, max: number,) {
+    super(workerScript);
+    this.pool = DynamicPool.getInstance(min, max, this.workerScript);
+  }
+
+  get size(): number {
+    return this.pool.workers.length;
+  }
+
+  get maxElementsPerWorker(): number {
+    return 1;
+  }
+
+  /**
+   *
+   * @return {Promise<void>}
+   * @public
+   */
+  // eslint-disable-next-line @typescript-eslint/no-empty-function
+  public async start(): Promise<void> { }
+
+  /**
+   *
+   * @return {Promise<void>}
+   * @public
+   */
+  public async addElement(elementData: WorkerData): Promise<void> {
+    await this.pool.execute(elementData);
+    // Start worker sequentially to optimize memory at startup
+    await Utils.sleep(Constants.START_WORKER_DELAY);
+  }
+}
+
+class DynamicPool extends DynamicThreadPool<WorkerData> {
+  private static instance: DynamicPool;
+
+  private constructor(min: number, max: number, filename: string, opts?: DynamicThreadPoolOptions) {
+    super(min, max, filename, opts);
+  }
+
+  public static getInstance(min: number, max: number, filename: string): DynamicPool {
+    if (!DynamicPool.instance) {
+      DynamicPool.instance = new DynamicPool(min, max, filename,
+        {
+          exitHandler: (code) => {
+            if (code !== 0) {
+              console.error(`Worker ${threadId} stopped with exit code ${code}`);
+            }
+          }
+        }
+      );
+    }
+    return DynamicPool.instance;
+  }
+}
index 067188d846401d13c3a76c1bdff6236442de6859..2d923daab084bb95b77a783509053bed4b519a7a 100644 (file)
@@ -1,13 +1,21 @@
 import Configuration from '../utils/Configuration';
-import WorkerPool from './WorkerPool';
+import WorkerDynamicPool from './WorkerDynamicPool';
+import { WorkerProcessType } from '../types/Worker';
 import WorkerSet from './WorkerSet';
+import WorkerStaticPool from './WorkerStaticPool';
 import Wrk from './Wrk';
 
 export default class WorkerFactory {
   public static getWorkerImpl(workerScript: string): Wrk {
-    if (Configuration.useWorkerPool()) {
-      return new WorkerPool(workerScript);
+    switch (Configuration.getWorkerProcess()) {
+      case WorkerProcessType.WORKER_SET:
+        return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker());
+      case WorkerProcessType.STATIC_POOL:
+        return new WorkerStaticPool(workerScript, Configuration.getWorkerPoolMaxSize());
+      case WorkerProcessType.DYNAMIC_POOL:
+        return new WorkerDynamicPool(workerScript, Configuration.getWorkerPoolMinSize(), Configuration.getWorkerPoolMaxSize());
+      default:
+        return null;
     }
-    return new WorkerSet(workerScript, Configuration.getChargingStationsPerWorker());
   }
 }
diff --git a/src/worker/WorkerPool.ts b/src/worker/WorkerPool.ts
deleted file mode 100644 (file)
index 5b9415c..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-import Configuration from '../utils/Configuration';
-import Constants from '../utils/Constants';
-import Pool from 'worker-threads-pool';
-import Utils from '../utils/Utils';
-import { WorkerData } from '../types/Worker';
-import Wrk from './Wrk';
-
-export default class WorkerPool extends Wrk {
-  private pool: Pool;
-
-  /**
-   * Create a new `WorkerPool`.
-   *
-   * @param {string} workerScript
-   */
-  constructor(workerScript: string) {
-    super(workerScript);
-    this.pool = UniquePool.getInstance();
-  }
-
-  get size(): number {
-    return this.pool.size;
-  }
-
-  get maxElementsPerWorker(): number {
-    return 1;
-  }
-
-  /**
-   *
-   * @return {Promise<void>}
-   * @public
-   */
-  // eslint-disable-next-line @typescript-eslint/no-empty-function
-  public async start(): Promise<void> { }
-
-  /**
-   *
-   * @return {Promise<void>}
-   * @public
-   */
-  public async addElement(elementData: WorkerData): Promise<void> {
-    return new Promise((resolve, reject) => {
-      this.pool.acquire(this.workerScript, { workerData: elementData }, (err, worker) => {
-        if (err) {
-          return reject(err);
-        }
-        worker.once('message', resolve);
-        worker.once('error', reject);
-      });
-      // Start worker sequentially to optimize memory at startup
-      void Utils.sleep(Constants.START_WORKER_DELAY);
-    });
-  }
-}
-
-class UniquePool {
-  private static instance: Pool;
-
-  private constructor() { }
-
-  public static getInstance(): Pool {
-    if (!UniquePool.instance) {
-      UniquePool.instance = new Pool({ max: Configuration.getWorkerPoolMaxSize() });
-    }
-    return UniquePool.instance;
-  }
-}
index 50d099d856633ff5cbc8ad6c5d1f57813a750b1e..2151eff9ceb51db91d6662957694f09b1c81c921 100644 (file)
@@ -1,8 +1,8 @@
+import { Worker, threadId } from 'worker_threads';
 import { WorkerData, WorkerEvents, WorkerSetElement } from '../types/Worker';
 
 import Constants from '../utils/Constants';
 import Utils from '../utils/Utils';
-import { Worker } from 'worker_threads';
 import Wrk from './Wrk';
 
 export default class WorkerSet extends Wrk {
@@ -32,12 +32,12 @@ export default class WorkerSet extends Wrk {
    */
   public async addElement(elementData: WorkerData): Promise<void> {
     if (!this.workers) {
-      throw Error('Cannot add a WorkerSet element: workers set does not exist');
+      throw Error('Cannot add a WorkerSet element: workers\' set does not exist');
     }
     if (this.getLastWorkerSetElement().numberOfWorkerElements >= this.maxElementsPerWorker) {
-      void this.startWorker();
+      this.startWorker();
       // Start worker sequentially to optimize memory at startup
-      void Utils.sleep(Constants.START_WORKER_DELAY);
+      await Utils.sleep(Constants.START_WORKER_DELAY);
     }
     this.getLastWorker().postMessage({ id: WorkerEvents.START_WORKER_ELEMENT, workerData: elementData });
     this.getLastWorkerSetElement().numberOfWorkerElements++;
@@ -49,7 +49,7 @@ export default class WorkerSet extends Wrk {
    * @public
    */
   public async start(): Promise<void> {
-    await this.startWorker();
+    this.startWorker();
     // Start worker sequentially to optimize memory at startup
     await Utils.sleep(Constants.START_WORKER_DELAY);
   }
@@ -59,18 +59,16 @@ export default class WorkerSet extends Wrk {
    * @return {Promise}
    * @private
    */
-  private async startWorker() {
-    return new Promise((resolve, reject) => {
-      const worker = new Worker(this.workerScript);
-      worker.on('message', resolve);
-      worker.on('error', reject);
-      worker.on('exit', (code) => {
-        if (code !== 0) {
-          reject(new Error(`Worker stopped with exit code ${code}`));
-        }
-      });
-      this.workers.add({ worker, numberOfWorkerElements: 0 });
+  private startWorker(): void {
+    const worker = new Worker(this.workerScript);
+    worker.on('message', () => { });
+    worker.on('error', () => { });
+    worker.on('exit', (code) => {
+      if (code !== 0) {
+        console.error(`Worker ${threadId} stopped with exit code ${code}`);
+      }
     });
+    this.workers.add({ worker, numberOfWorkerElements: 0 });
   }
 
   private getLastWorkerSetElement(): WorkerSetElement {
diff --git a/src/worker/WorkerStaticPool.ts b/src/worker/WorkerStaticPool.ts
new file mode 100644 (file)
index 0000000..20d53b0
--- /dev/null
@@ -0,0 +1,71 @@
+import { FixedThreadPool, FixedThreadPoolOptions } from 'poolifier';
+
+import Constants from '../utils/Constants';
+import Utils from '../utils/Utils';
+import { WorkerData } from '../types/Worker';
+import Wrk from './Wrk';
+import { threadId } from 'worker_threads';
+
+export default class WorkerStaticPool extends Wrk {
+  private pool: StaticPool;
+
+  /**
+   * Create a new `WorkerStaticPool`.
+   *
+   * @param {string} workerScript
+   */
+  constructor(workerScript: string, numThreads: number) {
+    super(workerScript);
+    this.pool = StaticPool.getInstance(numThreads, this.workerScript);
+  }
+
+  get size(): number {
+    return this.pool.workers.length;
+  }
+
+  get maxElementsPerWorker(): number {
+    return 1;
+  }
+
+  /**
+   *
+   * @return {Promise<void>}
+   * @public
+   */
+  // eslint-disable-next-line @typescript-eslint/no-empty-function
+  public async start(): Promise<void> { }
+
+  /**
+   *
+   * @return {Promise<void>}
+   * @public
+   */
+  public async addElement(elementData: WorkerData): Promise<void> {
+    await this.pool.execute(elementData);
+    // Start worker sequentially to optimize memory at startup
+    await Utils.sleep(Constants.START_WORKER_DELAY);
+  }
+}
+
+class StaticPool extends FixedThreadPool<WorkerData> {
+  private static instance: StaticPool;
+
+  private constructor(numThreads: number, workerScript: string, opts?: FixedThreadPoolOptions) {
+    super(numThreads, workerScript, opts);
+  }
+
+  public static getInstance(numThreads: number, workerScript: string): StaticPool {
+    if (!StaticPool.instance) {
+      StaticPool.instance = new StaticPool(numThreads, workerScript,
+        {
+          exitHandler: (code) => {
+            if (code !== 0) {
+              console.error(`Worker ${threadId} stopped with exit code ${code}`);
+            }
+          }
+        }
+      );
+    }
+    return StaticPool.instance;
+  }
+}