fix: untangle worker set message from application message
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 22 Mar 2024 23:47:13 +0000 (00:47 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 22 Mar 2024 23:47:13 +0000 (00:47 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/Bootstrap.ts
src/charging-station/ChargingStationWorker.ts
src/performance/storage/None.ts
src/types/ChargingStationWorker.ts
src/types/index.ts
src/worker/WorkerSet.ts
src/worker/WorkerTypes.ts
src/worker/index.ts

index 4f72d6dfdffef0ca7a65cb24a2103f19558ced6f..c13eb1b187cc7d6a60cb93e772096348f44c2309 100644 (file)
@@ -18,7 +18,6 @@ import {
   type ChargingStationInfo,
   type ChargingStationOptions,
   type ChargingStationWorkerData,
-  type ChargingStationWorkerEventError,
   type ChargingStationWorkerMessage,
   type ChargingStationWorkerMessageData,
   ChargingStationWorkerMessageEvents,
@@ -43,7 +42,7 @@ import {
   logger,
   logPrefix
 } from '../utils/index.js'
-import { type WorkerAbstract, WorkerFactory } from '../worker/index.js'
+import { DEFAULT_ELEMENTS_PER_WORKER, type WorkerAbstract, WorkerFactory } from '../worker/index.js'
 import { buildTemplateName, waitChargingStationEvents } from './Helpers.js'
 import type { AbstractUIServer } from './ui-server/AbstractUIServer.js'
 import { UIServerFactory } from './ui-server/UIServerFactory.js'
@@ -167,15 +166,6 @@ export class Bootstrap extends EventEmitter {
           ChargingStationWorkerMessageEvents.performanceStatistics,
           this.workerEventPerformanceStatistics
         )
-        this.on(
-          ChargingStationWorkerMessageEvents.workerElementError,
-          (eventError: ChargingStationWorkerEventError) => {
-            logger.error(
-              `${this.logPrefix()} ${moduleName}.start: Error occurred while handling '${eventError.event}' event on worker:`,
-              eventError
-            )
-          }
-        )
         // eslint-disable-next-line @typescript-eslint/unbound-method
         if (isAsyncFunction(this.workerImplementation?.start)) {
           await this.workerImplementation.start()
@@ -340,12 +330,13 @@ export class Bootstrap extends EventEmitter {
         elementsPerWorker = this.numberOfConfiguredChargingStations
         break
       case 'auto':
-      default:
         elementsPerWorker =
           this.numberOfConfiguredChargingStations > availableParallelism()
             ? Math.round(this.numberOfConfiguredChargingStations / (availableParallelism() * 1.5))
             : 1
         break
+      default:
+        elementsPerWorker = workerConfiguration.elementsPerWorker ?? DEFAULT_ELEMENTS_PER_WORKER
     }
     this.workerImplementation = WorkerFactory.getWorkerImplementation<
     ChargingStationWorkerData,
@@ -405,12 +396,6 @@ export class Bootstrap extends EventEmitter {
         case ChargingStationWorkerMessageEvents.performanceStatistics:
           this.emit(ChargingStationWorkerMessageEvents.performanceStatistics, msg.data)
           break
-        case ChargingStationWorkerMessageEvents.addedWorkerElement:
-          this.emit(ChargingStationWorkerMessageEvents.addWorkerElement, msg.data)
-          break
-        case ChargingStationWorkerMessageEvents.workerElementError:
-          this.emit(ChargingStationWorkerMessageEvents.workerElementError, msg.data)
-          break
         default:
           throw new BaseError(
             `Unknown charging station worker event: '${
index 042d3d5bfc2b3bc913484c5a0f5d0982b982148f..779c638a49fab5ec30119e184682335afa01b20d 100644 (file)
@@ -5,14 +5,9 @@ import { parentPort } from 'node:worker_threads'
 import { ThreadWorker } from 'poolifier'
 
 import { BaseError } from '../exception/index.js'
-import type {
-  ChargingStationInfo,
-  ChargingStationWorkerData,
-  ChargingStationWorkerEventError,
-  ChargingStationWorkerMessage
-} from '../types/index.js'
+import type { ChargingStationInfo, ChargingStationWorkerData } from '../types/index.js'
 import { Configuration } from '../utils/index.js'
-import { type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
+import { type WorkerDataError, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
 import { ChargingStation } from './ChargingStation.js'
 
 export let chargingStationWorker: object
@@ -21,43 +16,47 @@ if (Configuration.workerPoolInUse()) {
   ChargingStationWorkerData,
   ChargingStationInfo | undefined
   >((data?: ChargingStationWorkerData): ChargingStationInfo | undefined => {
-    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion, no-new
-    return new ChargingStation(data!.index, data!.templateFile, data!.options).stationInfo
+    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+    const { index, templateFile, options } = data!
+    return new ChargingStation(index, templateFile, options).stationInfo
   })
 } else {
   // eslint-disable-next-line @typescript-eslint/no-extraneous-class
   class ChargingStationWorker<Data extends ChargingStationWorkerData> {
     constructor () {
       parentPort?.on('message', (message: WorkerMessage<Data>) => {
-        switch (message.event) {
+        const { uuid, event, data } = message
+        switch (event) {
           case WorkerMessageEvents.addWorkerElement:
             try {
               const chargingStation = new ChargingStation(
-                message.data.index,
-                message.data.templateFile,
-                message.data.options
+                data.index,
+                data.templateFile,
+                data.options
               )
               parentPort?.postMessage({
+                uuid,
                 event: WorkerMessageEvents.addedWorkerElement,
                 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
                 data: chargingStation.stationInfo!
-              } satisfies ChargingStationWorkerMessage<ChargingStationInfo>)
+              } satisfies WorkerMessage<ChargingStationInfo>)
             } catch (error) {
               parentPort?.postMessage({
+                uuid,
                 event: WorkerMessageEvents.workerElementError,
                 data: {
-                  event: message.event,
+                  event,
                   name: (error as Error).name,
                   message: (error as Error).message,
                   stack: (error as Error).stack
                 }
-              } satisfies ChargingStationWorkerMessage<ChargingStationWorkerEventError>)
+              } satisfies WorkerMessage<WorkerDataError>)
             }
             break
           default:
             throw new BaseError(
-              `Unknown worker event: '${message.event}' received with data: '${JSON.stringify(
-                message.data,
+              `Unknown worker event: '${event}' received with data: '${JSON.stringify(
+                data,
                 undefined,
                 2
               )}'`
index af3e273e2f5c039973a9159d8512ec637cbae641..aae7547f5b32afb656c023106c01babd846205b4 100644 (file)
@@ -13,7 +13,7 @@ export class None extends Storage {
   }
 
   public open (): void {
-    /** Intentionally empty   */
+    /** Intentionally empty */
   }
 
   public close (): void {
index c6fd0af2afdb84bb83cd354a4e00f8a3369d85c8..336b50e87c76da63a95425fca72711805a8b24ce 100644 (file)
@@ -1,6 +1,6 @@
 import type { WebSocket } from 'ws'
 
-import { type WorkerData, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js'
+import { type WorkerData, type WorkerMessage } from '../worker/index.js'
 import type { ChargingStationAutomaticTransactionGeneratorConfiguration } from './AutomaticTransactionGenerator.js'
 import { ChargingStationEvents } from './ChargingStationEvents.js'
 import type { ChargingStationInfo } from './ChargingStationInfo.js'
@@ -52,32 +52,19 @@ enum ChargingStationMessageEvents {
 }
 
 export const ChargingStationWorkerMessageEvents = {
-  ...WorkerMessageEvents,
   ...ChargingStationEvents,
   ...ChargingStationMessageEvents
 } as const
 // eslint-disable-next-line @typescript-eslint/no-redeclare
 export type ChargingStationWorkerMessageEvents =
-  | WorkerMessageEvents
   | ChargingStationEvents
   | ChargingStationMessageEvents
 
-export interface ChargingStationWorkerEventError extends WorkerData {
-  event: WorkerMessageEvents
-  name: string
-  message: string
-  stack?: string
-}
-
-export type ChargingStationWorkerMessageData =
-  | ChargingStationInfo
-  | ChargingStationData
-  | Statistics
-  | ChargingStationWorkerEventError
+export type ChargingStationWorkerMessageData = ChargingStationData | Statistics
 
 export type ChargingStationWorkerMessage<T extends ChargingStationWorkerMessageData> = Omit<
 WorkerMessage<T>,
-'event'
+'uuid' | 'event'
 > & {
   event: ChargingStationWorkerMessageEvents
 }
index e24d7f5437cfbff9afe4e204bb185d2749100af1..868dd73df07ea4fa37b95a333357c8bf4436a381 100644 (file)
@@ -27,7 +27,6 @@ export {
   type ChargingStationData,
   type ChargingStationOptions,
   type ChargingStationWorkerData,
-  type ChargingStationWorkerEventError,
   type ChargingStationWorkerMessage,
   type ChargingStationWorkerMessageData,
   ChargingStationWorkerMessageEvents,
index 97414af78c769ab44c61433ed8276b2b04d2500c..7636a167a5548993b68f6a17791f22dc33bfbd5d 100644 (file)
@@ -1,5 +1,6 @@
 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
 
+import { randomUUID } from 'node:crypto'
 import { EventEmitterAsyncResource } from 'node:events'
 import { SHARE_ENV, Worker } from 'node:worker_threads'
 
@@ -16,9 +17,20 @@ import {
 } from './WorkerTypes.js'
 import { randomizeDelay, sleep } from './WorkerUtils.js'
 
+interface ResponseWrapper<R extends WorkerData> {
+  resolve: (value: R | PromiseLike<R>) => void
+  reject: (reason?: unknown) => void
+  workerSetElement: WorkerSetElement
+}
+
 export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
   public readonly emitter: EventEmitterAsyncResource | undefined
   private readonly workerSet: Set<WorkerSetElement>
+  private readonly promiseResponseMap: Map<
+    `${string}-${string}-${string}-${string}`,
+  ResponseWrapper<R>
+  >
+
   private started: boolean
   private workerStartup: boolean
 
@@ -40,6 +52,10 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
       throw new RangeError('Elements per worker must be greater than zero')
     }
     this.workerSet = new Set<WorkerSetElement>()
+    this.promiseResponseMap = new Map<
+      `${string}-${string}-${string}-${string}`,
+    ResponseWrapper<R>
+    >()
     if (this.workerOptions.poolOptions?.enableEvents === true) {
       this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
     }
@@ -107,25 +123,16 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
       throw new Error('Cannot add a WorkerSet element: not started')
     }
     const workerSetElement = await this.getWorkerSetElement()
-    const waitAddedWorkerElement = new Promise<R>((resolve, reject) => {
-      const messageHandler = (message: WorkerMessage<R>): void => {
-        if (message.event === WorkerMessageEvents.addedWorkerElement) {
-          ++workerSetElement.numberOfWorkerElements
-          resolve(message.data)
-          workerSetElement.worker.off('message', messageHandler)
-        } else if (message.event === WorkerMessageEvents.workerElementError) {
-          // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
-          reject(message.data)
-          workerSetElement.worker.off('message', messageHandler)
-        }
-      }
-      workerSetElement.worker.on('message', messageHandler)
-    })
-    workerSetElement.worker.postMessage({
-      event: WorkerMessageEvents.addWorkerElement,
-      data: elementData
+    const sendMessageToWorker = new Promise<R>((resolve, reject) => {
+      const message = {
+        uuid: randomUUID(),
+        event: WorkerMessageEvents.addWorkerElement,
+        data: elementData
+      } satisfies WorkerMessage<D>
+      workerSetElement.worker.postMessage(message)
+      this.promiseResponseMap.set(message.uuid, { resolve, reject, workerSetElement })
     })
-    const response = await waitAddedWorkerElement
+    const response = await sendMessageToWorker
     // Add element sequentially to optimize memory at startup
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     if (this.workerOptions.elementAddDelay! > 0) {
@@ -146,10 +153,18 @@ export class WorkerSet<D extends WorkerData, R extends WorkerData> extends Worke
     })
     worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
     worker.on('message', (message: WorkerMessage<R>) => {
-      if (message.event === WorkerMessageEvents.addedWorkerElement) {
-        this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
-      } else if (message.event === WorkerMessageEvents.workerElementError) {
-        this.emitter?.emit(WorkerSetEvents.elementError, message.data)
+      if (this.promiseResponseMap.has(message.uuid)) {
+        // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+        const { resolve, reject, workerSetElement } = this.promiseResponseMap.get(message.uuid)!
+        if (message.event === WorkerMessageEvents.addedWorkerElement) {
+          this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
+          workerSetElement.numberOfWorkerElements++
+          resolve(message.data)
+        } else if (message.event === WorkerMessageEvents.workerElementError) {
+          this.emitter?.emit(WorkerSetEvents.elementError, message.data)
+          reject(message.data)
+        }
+        this.promiseResponseMap.delete(message.uuid)
       }
     })
     worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
index ee69c191e4a8da04c08984d74a255ef3d0b40ac4..e1e89a7cde183ac99841b55fdc67b04ad0839329 100644 (file)
@@ -51,6 +51,7 @@ export interface WorkerSetElement {
 }
 
 export interface WorkerMessage<T extends WorkerData> {
+  uuid: `${string}-${string}-${string}-${string}`
   event: WorkerMessageEvents
   data: T
 }
@@ -60,3 +61,10 @@ export enum WorkerMessageEvents {
   addedWorkerElement = 'addedWorkerElement',
   workerElementError = 'workerElementError'
 }
+
+export interface WorkerDataError extends WorkerData {
+  event: WorkerMessageEvents
+  name: string
+  message: string
+  stack?: string
+}
index 649e0c62920bdd33330da18dc744cd1e7d58fa34..190a6a41034d2738ef9b5e434cec82dc29917619 100644 (file)
@@ -1,6 +1,7 @@
 export type { WorkerAbstract } from './WorkerAbstract.js'
 export {
   DEFAULT_ELEMENT_ADD_DELAY,
+  DEFAULT_ELEMENTS_PER_WORKER,
   DEFAULT_POOL_MAX_SIZE,
   DEFAULT_POOL_MIN_SIZE,
   DEFAULT_WORKER_START_DELAY
@@ -8,6 +9,7 @@ export {
 export { WorkerFactory } from './WorkerFactory.js'
 export {
   type WorkerData,
+  type WorkerDataError,
   WorkerEvents,
   type WorkerMessage,
   WorkerMessageEvents,