]> Piment Noir Git Repositories - e-mobility-charging-stations-simulator.git/commitdiff
refactor: make worker set code more robust
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Aug 2025 21:11:41 +0000 (23:11 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 18 Aug 2025 21:11:41 +0000 (23:11 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/charging-station/ChargingStationWorker.ts
src/utils/Constants.ts
src/worker/WorkerAbstract.ts
src/worker/WorkerConstants.ts
src/worker/WorkerDynamicPool.ts
src/worker/WorkerFixedPool.ts
src/worker/WorkerSet.ts
src/worker/WorkerTypes.ts

index fd9717c412db5a021bba0a6ab0d8a4ef1eeb89c0..73c816a32aac597f68b92ba16e440a10483fa1a3 100644 (file)
@@ -10,7 +10,9 @@ import { Configuration } from '../utils/index.js'
 import { type WorkerDataError, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
 import { ChargingStation } from './ChargingStation.js'
 
-export let chargingStationWorker: object
+export let chargingStationWorker:
+  | object
+  | ThreadWorker<ChargingStationWorkerData, ChargingStationInfo>
 if (Configuration.workerPoolInUse()) {
   chargingStationWorker = new ThreadWorker<
     ChargingStationWorkerData,
index 37025b3378d84a5b225e4b7e4451dfb797f4165b..9732d386d7629d8cb722d8c6f7464e69f5d916b3 100644 (file)
@@ -10,7 +10,7 @@ import {
 
 // eslint-disable-next-line @typescript-eslint/no-extraneous-class
 export class Constants {
-  static readonly DEFAULT_ATG_CONFIGURATION: AutomaticTransactionGeneratorConfiguration =
+  static readonly DEFAULT_ATG_CONFIGURATION: Readonly<AutomaticTransactionGeneratorConfiguration> =
     Object.freeze({
       enable: false,
       maxDelayBetweenTwoTransactions: 30,
@@ -27,7 +27,7 @@ export class Constants {
   static readonly DEFAULT_BOOT_NOTIFICATION_INTERVAL = 60000 // Ms
 
   static readonly DEFAULT_CIRCULAR_BUFFER_CAPACITY = 386
-  static readonly DEFAULT_CONNECTION_TIMEOUT = 30
+  static readonly DEFAULT_CONNECTION_TIMEOUT = 30 // Seconds
 
   static readonly DEFAULT_FLUCTUATION_PERCENT = 5
   static readonly DEFAULT_HASH_ALGORITHM = 'sha384'
@@ -87,11 +87,11 @@ export class Constants {
   static readonly DEFAULT_UI_SERVER_PORT = 8080
   static readonly EMPTY_FROZEN_OBJECT = Object.freeze({})
 
-  static readonly EMPTY_FUNCTION = Object.freeze(() => {
+  static readonly EMPTY_FUNCTION: () => void = Object.freeze(() => {
     /* This is intentional */
   })
 
-  static readonly MAX_RANDOM_INTEGER = 281474976710655
+  static readonly MAX_RANDOM_INTEGER = 281474976710655 // 2^48 - 1 (randomInit() limit)
 
   static readonly PERFORMANCE_RECORDS_TABLE = 'performance_records'
 
index d4282e0c31141ef50c80be99b357b5f3c75129f4..a38db441b92f8523e7c7018c34317e8f96cb95a0 100644 (file)
@@ -1,7 +1,7 @@
 import type { EventEmitterAsyncResource } from 'node:events'
 import type { PoolInfo } from 'poolifier'
 
-import { existsSync } from 'node:fs'
+import { statSync } from 'node:fs'
 
 import type { SetInfo, WorkerData, WorkerOptions } from './WorkerTypes.js'
 
@@ -29,8 +29,12 @@ export abstract class WorkerAbstract<D extends WorkerData, R extends WorkerData>
     if (workerScript.trim().length === 0) {
       throw new Error('Worker script is an empty string')
     }
-    if (!existsSync(workerScript)) {
-      throw new Error('Worker script file does not exist')
+    const workerScriptStats = statSync(workerScript, { throwIfNoEntry: false })
+    if (workerScriptStats == null) {
+      throw new Error(`Worker script file does not exist: '${workerScript}'`)
+    }
+    if (!workerScriptStats.isFile()) {
+      throw new Error(`Worker script is not a regular file: '${workerScript}'`)
     }
     this.workerScript = workerScript
     this.workerOptions = workerOptions
index 0d7407dcdef8887db1fe231620408e47cf000626..5f4084f57b123459b8d107fa4c15debb2fb5c84e 100644 (file)
@@ -1,4 +1,4 @@
-import { availableParallelism } from 'poolifier'
+import { availableParallelism, type ThreadPoolOptions } from 'poolifier'
 
 import type { WorkerOptions } from './WorkerTypes.js'
 
@@ -12,21 +12,26 @@ export const workerSetVersion = '1.0.1'
 
 export const DEFAULT_ELEMENT_ADD_DELAY = 0
 export const DEFAULT_WORKER_START_DELAY = 500
-export const DEFAULT_POOL_MIN_SIZE = Math.floor(availableParallelism() / 2)
-export const DEFAULT_POOL_MAX_SIZE = Math.round(availableParallelism() * 1.5)
+export const DEFAULT_POOL_MIN_SIZE = Math.max(1, Math.floor(availableParallelism() / 2))
+export const DEFAULT_POOL_MAX_SIZE = Math.max(
+  DEFAULT_POOL_MIN_SIZE,
+  Math.round(availableParallelism() * 1.5)
+)
 export const DEFAULT_ELEMENTS_PER_WORKER = 1
 
+export const DEFAULT_POOL_OPTIONS: Readonly<ThreadPoolOptions> = Object.freeze({
+  enableEvents: true,
+  errorHandler: defaultErrorHandler,
+  exitHandler: defaultExitHandler,
+  restartWorkerOnError: true,
+  startWorkers: false,
+})
+
 export const DEFAULT_WORKER_OPTIONS: Readonly<WorkerOptions> = Object.freeze({
   elementAddDelay: DEFAULT_ELEMENT_ADD_DELAY,
   elementsPerWorker: DEFAULT_ELEMENTS_PER_WORKER,
   poolMaxSize: DEFAULT_POOL_MAX_SIZE,
   poolMinSize: DEFAULT_POOL_MIN_SIZE,
-  poolOptions: {
-    enableEvents: true,
-    errorHandler: defaultErrorHandler,
-    exitHandler: defaultExitHandler,
-    restartWorkerOnError: true,
-    startWorkers: false,
-  },
+  poolOptions: DEFAULT_POOL_OPTIONS,
   workerStartDelay: DEFAULT_WORKER_START_DELAY,
 })
index e5808d300e5e05075639c973000d017994135293..f7ed91b1d91b08a2436dcc51636a597631958ffd 100644 (file)
@@ -48,10 +48,9 @@ export class WorkerDynamicPool<D extends WorkerData, R extends WorkerData> exten
   public async addElement (elementData: D): Promise<R> {
     const response = await this.pool.execute(elementData)
     // Start element sequentially to optimize memory at startup
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    this.workerOptions.elementAddDelay! > 0 &&
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      (await sleep(randomizeDelay(this.workerOptions.elementAddDelay!)))
+    if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) {
+      await sleep(randomizeDelay(this.workerOptions.elementAddDelay))
+    }
     return response
   }
 
index 8bddad7de428dc334efa0a2d9bd9ba61574214f8..f5d9b01e87c73a7ef88ea5306061329e1fe379b3 100644 (file)
@@ -47,10 +47,9 @@ export class WorkerFixedPool<D extends WorkerData, R extends WorkerData> extends
   public async addElement (elementData: D): Promise<R> {
     const response = await this.pool.execute(elementData)
     // Start element sequentially to optimize memory at startup
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    this.workerOptions.elementAddDelay! > 0 &&
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      (await sleep(randomizeDelay(this.workerOptions.elementAddDelay!)))
+    if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) {
+      await sleep(randomizeDelay(this.workerOptions.elementAddDelay))
+    }
     return response
   }
 
index babf6932487b903f594ae85a7f330e00c66ba7ba..b0f29e64f954943a60a9964834d08a7f8e587492 100644 (file)
@@ -51,7 +51,7 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
   }
 
   private readonly promiseResponseMap: Map<
-    `${string}-${string}-${string}-${string}`,
+    `${string}-${string}-${string}-${string}-${string}`,
     ResponseWrapper<R>
   >
 
@@ -77,7 +77,7 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
     }
     this.workerSet = new Set<WorkerSetElement>()
     this.promiseResponseMap = new Map<
-      `${string}-${string}-${string}-${string}`,
+      `${string}-${string}-${string}-${string}-${string}`,
       ResponseWrapper<R>
     >()
     if (this.workerOptions.poolOptions?.enableEvents === true) {
@@ -99,19 +99,17 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
         event: WorkerMessageEvents.addWorkerElement,
         uuid: randomUUID(),
       } satisfies WorkerMessage<D>
-      workerSetElement.worker.postMessage(message)
       this.promiseResponseMap.set(message.uuid, {
         reject,
         resolve,
         workerSetElement,
       })
+      workerSetElement.worker.postMessage(message)
     })
     const response = await sendMessageToWorker
     // Add element sequentially to optimize memory at startup
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-    if (this.workerOptions.elementAddDelay! > 0) {
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
+    if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) {
+      await sleep(randomizeDelay(this.workerOptions.elementAddDelay))
     }
     return response
   }
@@ -124,8 +122,8 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
     this.workerOptions.workerStartDelay! > 0 &&
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
       (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
-    this.emitter?.emit(WorkerSetEvents.started, this.info)
     this.started = true
+    this.emitter?.emit(WorkerSetEvents.started, this.info)
   }
 
   /** @inheritDoc */
@@ -141,8 +139,20 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
       await worker.terminate()
       await waitWorkerExit
     }
-    this.emitter?.emit(WorkerSetEvents.stopped, this.info)
+    for (const [uuid, responseWrapper] of this.promiseResponseMap) {
+      try {
+        responseWrapper.reject(
+          new Error(`WorkerSet stopped before responding request (uuid: ${uuid})`)
+        )
+      } finally {
+        this.promiseResponseMap.delete(uuid)
+      }
+    }
+    if (this.workerSet.size > 0) {
+      this.workerSet.clear()
+    }
     this.started = false
+    this.emitter?.emit(WorkerSetEvents.stopped, this.info)
     this.emitter?.emitDestroy()
   }
 
@@ -160,6 +170,7 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
     worker.on('message', (message: WorkerMessage<R>) => {
       const { data, event, uuid } = message
       if (this.promiseResponseMap.has(uuid)) {
+        let error: Error
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         const { reject, resolve, workerSetElement } = this.promiseResponseMap.get(uuid)!
         switch (event) {
@@ -173,22 +184,32 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
             reject(data)
             break
           default:
-            reject(
-              new Error(
-                `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
-                  data,
-                  undefined,
-                  2
-                )}'`
-              )
+            error = new Error(
+              `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
+                data,
+                undefined,
+                2
+              )}'`
             )
+            this.emitter?.emit(WorkerSetEvents.error, error)
+            reject(error)
         }
         this.promiseResponseMap.delete(uuid)
+      } else {
+        this.emitter?.emit(WorkerSetEvents.elementError, {
+          data,
+          event,
+          message: `Unknown worker message uuid: '${uuid}'`,
+        })
       }
     })
     worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
     worker.once('error', error => {
       this.emitter?.emit(WorkerSetEvents.error, error)
+      const workerSetElement = this.getWorkerSetElementByWorker(worker)
+      if (workerSetElement != null) {
+        this.rejectPendingPromiseForWorker(workerSetElement, error)
+      }
       if (
         this.workerOptions.poolOptions?.restartWorkerOnError === true &&
         this.started &&
@@ -203,7 +224,11 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
     worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
-      this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
+      const workerSetElement = this.getWorkerSetElementByWorker(worker)
+      if (workerSetElement != null) {
+        this.rejectPendingPromiseForWorker(workerSetElement, new Error('Worker exited'))
+      }
+      this.removeWorkerSetElement(workerSetElement)
     })
     const workerSetElement: WorkerSetElement = {
       numberOfWorkerElements: 0,
@@ -226,23 +251,36 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
     if (chosenWorkerSetElement == null) {
       chosenWorkerSetElement = this.addWorkerSetElement()
       // Add worker set element sequentially to optimize memory at startup
-      // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      this.workerOptions.workerStartDelay! > 0 &&
-        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
+      if (this.workerOptions.workerStartDelay != null && this.workerOptions.workerStartDelay > 0) {
+        await sleep(randomizeDelay(this.workerOptions.workerStartDelay))
+      }
     }
     return chosenWorkerSetElement
   }
 
   private getWorkerSetElementByWorker (worker: Worker): undefined | WorkerSetElement {
-    let workerSetElt: undefined | WorkerSetElement
+    let workerSetElementFound: undefined | WorkerSetElement
     for (const workerSetElement of this.workerSet) {
       if (workerSetElement.worker.threadId === worker.threadId) {
-        workerSetElt = workerSetElement
+        workerSetElementFound = workerSetElement
         break
       }
     }
-    return workerSetElt
+    return workerSetElementFound
+  }
+
+  private rejectPendingPromiseForWorker (workerSetElement: WorkerSetElement, reason: unknown): void {
+    for (const [uuid, responseWrapper] of this.promiseResponseMap) {
+      if (responseWrapper.workerSetElement === workerSetElement) {
+        try {
+          responseWrapper.reject(
+            reason ?? new Error(`Worker failed before completing request (uuid: ${uuid})`)
+          )
+        } finally {
+          this.promiseResponseMap.delete(uuid)
+        }
+      }
+    }
   }
 
   private removeWorkerSetElement (workerSetElement: undefined | WorkerSetElement): void {
index c6b5c4737191252643c9e8524ae506f0e9b954e1..9a800870d9b4794d3fac63afbb643a2baf1e89ca 100644 (file)
@@ -52,7 +52,7 @@ export type WorkerEvents = PoolEvent | WorkerSetEvents
 export interface WorkerMessage<T extends WorkerData> {
   data: T
   event: WorkerMessageEvents
-  uuid: `${string}-${string}-${string}-${string}`
+  uuid: `${string}-${string}-${string}-${string}-${string}`
 }
 
 export interface WorkerOptions extends Record<string, unknown> {