Use a Map to cache OCPP requests in use
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 11 Sep 2021 19:45:59 +0000 (21:45 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 11 Sep 2021 19:46:42 +0000 (21:46 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
package-lock.json
package.json
src/charging-station/ChargingStation.ts
src/charging-station/ocpp/OCPPRequestService.ts
src/types/WebSocket.ts
src/types/ocpp/Requests.ts

index 353f084be037a0ecd6f9cc0d1f80e6ea719b335c..a7e37489eb4e52aeaae93ea32c013379099e833b 100644 (file)
       }
     },
     "typescript": {
-      "version": "4.4.2",
-      "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.2.tgz",
-      "integrity": "sha512-gzP+t5W4hdy4c+68bfcv0t400HVJMMd2+H9B7gae1nQlBzCqvrXX+6GL/b3GAgyTH966pzrZ70/fRjwAtZksSQ==",
+      "version": "4.4.3",
+      "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.3.tgz",
+      "integrity": "sha512-4xfscpisVgqqDfPaJo5vkd+Qd/ItkoagnHpufr+i2QCHBsNYp+G7UAoyFl8aPtx879u38wPV65rZ8qbGZijalA==",
       "dev": true
     },
     "ua-parser-js": {
index f44caf17e901d5018918df4897422a432d9451b3..4f5aaa42e200c5a59f8fe4fa6b7ff63d8e043d80 100644 (file)
     "rollup-plugin-terser": "^7.0.2",
     "rollup-plugin-ts": "^1.4.1",
     "ts-node": "^10.2.1",
-    "typescript": "^4.4.2"
+    "typescript": "^4.4.3"
   }
 }
index 6a8df9313697177b898744b982ecc37c98fdaf87..2f1bbc3e787d8d5d7866d9e608b58672c038c804 100644 (file)
@@ -1,13 +1,14 @@
 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
 
+import { AvailabilityType, BootNotificationRequest, IncomingRequest, IncomingRequestCommand, Request } from '../types/ocpp/Requests';
 import { BootNotificationResponse, RegistrationStatus } from '../types/ocpp/Responses';
 import ChargingStationConfiguration, { ConfigurationKey } from '../types/ChargingStationConfiguration';
 import ChargingStationTemplate, { CurrentType, PowerUnits, Voltage } from '../types/ChargingStationTemplate';
 import { ConnectorPhaseRotation, StandardParametersKey, SupportedFeatureProfiles } from '../types/ocpp/Configuration';
 import Connectors, { Connector, SampledValueTemplate } from '../types/Connectors';
 import { MeterValueMeasurand, MeterValuePhase } from '../types/ocpp/MeterValues';
-import Requests, { AvailabilityType, BootNotificationRequest, IncomingRequest, IncomingRequestCommand } from '../types/ocpp/Requests';
-import WebSocket, { ClientOptions, MessageEvent } from 'ws';
+import { WSError, WebSocketCloseEventStatusCode } from '../types/WebSocket';
+import WebSocket, { ClientOptions, Data } from 'ws';
 
 import AutomaticTransactionGenerator from './AutomaticTransactionGenerator';
 import { ChargePointStatus } from '../types/ocpp/ChargePointStatus';
@@ -30,7 +31,6 @@ import PerformanceStatistics from '../performance/PerformanceStatistics';
 import { StopTransactionReason } from '../types/ocpp/Transaction';
 import { URL } from 'url';
 import Utils from '../utils/Utils';
-import { WebSocketCloseEventStatusCode } from '../types/WebSocket';
 import crypto from 'crypto';
 import fs from 'fs';
 import logger from '../utils/Logger';
@@ -44,7 +44,7 @@ export default class ChargingStation {
   public configuration!: ChargingStationConfiguration;
   public hasStopped: boolean;
   public wsConnection!: WebSocket;
-  public requests: Requests;
+  public requests: Map<string, Request>;
   public messageQueue: string[];
   public performanceStatistics!: PerformanceStatistics;
   public heartbeatSetInterval!: NodeJS.Timeout;
@@ -70,8 +70,8 @@ export default class ChargingStation {
     this.hasSocketRestarted = false;
     this.autoReconnectRetryCount = 0;
 
-    this.requests = {} as Requests;
-    this.messageQueue = [] as string[];
+    this.requests = new Map<string, Request>();
+    this.messageQueue = new Array<string>();
 
     this.authorizedTags = this.getAuthorizedTags();
   }
@@ -236,7 +236,6 @@ export default class ChargingStation {
     for (let index = 0; !Utils.isEmptyArray(sampledValueTemplates) && index < sampledValueTemplates.length; index++) {
       if (!Constants.SUPPORTED_MEASURANDS.includes(sampledValueTemplates[index]?.measurand ?? MeterValueMeasurand.ENERGY_ACTIVE_IMPORT_REGISTER)) {
         logger.warn(`${this.logPrefix()} Unsupported MeterValues measurand ${measurand} ${phase ? `on phase ${phase} ` : ''}in template on connectorId ${connectorId}`);
-        continue;
       } else if (phase && sampledValueTemplates[index]?.phase === phase && sampledValueTemplates[index]?.measurand === measurand
           && this.getConfigurationKey(StandardParametersKey.MeterValuesSampledData).value.includes(measurand)) {
         return sampledValueTemplates[index];
@@ -646,14 +645,15 @@ export default class ChargingStation {
     }
   }
 
-  private async onMessage(messageEvent: MessageEvent): Promise<void> {
+  private async onMessage(data: Data): Promise<void> {
     let [messageType, messageId, commandName, commandPayload, errorDetails]: IncomingRequest = [0, '', '' as IncomingRequestCommand, {}, {}];
     let responseCallback: (payload: Record<string, unknown> | string, requestPayload: Record<string, unknown>) => void;
     let rejectCallback: (error: OCPPError) => void;
     let requestPayload: Record<string, unknown>;
+    let cachedRequest: Request;
     let errMsg: string;
     try {
-      const request = JSON.parse(messageEvent.toString()) as IncomingRequest;
+      const request = JSON.parse(data.toString()) as IncomingRequest;
       if (Utils.isIterable(request)) {
         // Parse the message
         [messageType, messageId, commandName, commandPayload, errorDetails] = request;
@@ -673,8 +673,9 @@ export default class ChargingStation {
         // Outcome Message
         case MessageType.CALL_RESULT_MESSAGE:
           // Respond
-          if (Utils.isIterable(this.requests[messageId])) {
-            [responseCallback, , requestPayload] = this.requests[messageId];
+          cachedRequest = this.requests.get(messageId);
+          if (Utils.isIterable(cachedRequest)) {
+            [responseCallback, , requestPayload] = cachedRequest;
           } else {
             throw new OCPPError(ErrorType.PROTOCOL_ERROR, `Response request for message id ${messageId} is not iterable`, commandName);
           }
@@ -682,21 +683,22 @@ export default class ChargingStation {
             // Error
             throw new OCPPError(ErrorType.INTERNAL_ERROR, `Response request for unknown message id ${messageId}`, commandName);
           }
-          delete this.requests[messageId];
+          this.requests.delete(messageId);
           responseCallback(commandName, requestPayload);
           break;
         // Error Message
         case MessageType.CALL_ERROR_MESSAGE:
-          if (!this.requests[messageId]) {
+          cachedRequest = this.requests.get(messageId);
+          if (!cachedRequest) {
             // Error
             throw new OCPPError(ErrorType.INTERNAL_ERROR, `Error request for unknown message id ${messageId}`);
           }
-          if (Utils.isIterable(this.requests[messageId])) {
-            [, rejectCallback] = this.requests[messageId];
+          if (Utils.isIterable(cachedRequest)) {
+            [, rejectCallback] = cachedRequest;
           } else {
             throw new OCPPError(ErrorType.PROTOCOL_ERROR, `Error request for message id ${messageId} is not iterable`);
           }
-          delete this.requests[messageId];
+          this.requests.delete(messageId);
           rejectCallback(new OCPPError(commandName, commandPayload.toString(), null, errorDetails));
           break;
         // Error
@@ -707,7 +709,7 @@ export default class ChargingStation {
       }
     } catch (error) {
       // Log
-      logger.error('%s Incoming request message %j processing error %j on content type %j', this.logPrefix(), messageEvent, error, this.requests[messageId]);
+      logger.error('%s Incoming request message %j processing error %j on content type %j', this.logPrefix(), data, error, this.requests.get(messageId));
       // Send error
       messageType !== MessageType.CALL_ERROR_MESSAGE && await this.ocppRequestService.sendError(messageId, error, commandName);
     }
@@ -721,11 +723,11 @@ export default class ChargingStation {
     logger.debug(this.logPrefix() + ' Received a WS pong (rfc6455) from the server');
   }
 
-  private async onError(errorEvent: any): Promise<void> {
-    logger.error(this.logPrefix() + ' Socket error: %j', errorEvent);
-    // switch (errorEvent.code) {
+  private async onError(error: WSError): Promise<void> {
+    logger.error(this.logPrefix() + ' Socket error: %j', error);
+    // switch (error.code) {
     //   case 'ECONNREFUSED':
-    //     await this._reconnect(errorEvent);
+    //     await this.reconnect(error);
     //     break;
     // }
   }
@@ -985,6 +987,7 @@ export default class ChargingStation {
             logger.debug(this.logPrefix() + ' Authorization file ' + authorizationFile + ' have changed, reload');
             // Initialize authorizedTags
             this.authorizedTags = this.getAuthorizedTags();
+            console.log('here');
           } catch (error) {
             logger.error(this.logPrefix() + ' Authorization file monitoring error: %j', error);
           }
@@ -1030,7 +1033,7 @@ export default class ChargingStation {
     return !Utils.isUndefined(this.stationInfo.reconnectExponentialDelay) ? this.stationInfo.reconnectExponentialDelay : false;
   }
 
-  private async reconnect(error: unknown): Promise<void> {
+  private async reconnect(error: Error): Promise<void> {
     // Stop WebSocket ping
     this.stopWebSocketPing();
     // Stop heartbeat
index 7b20343a33471aa560989f005948cdc4c142dadd..f89567064467af1f276ab91322e8af9e2570a881 100644 (file)
@@ -35,7 +35,7 @@ export default abstract class OCPPRequestService {
         // Request
         case MessageType.CALL_MESSAGE:
           // Build request
-          this.chargingStation.requests[messageId] = [responseCallback, rejectCallback, commandParams as Record<string, unknown>];
+          this.chargingStation.requests.set(messageId, [responseCallback, rejectCallback, commandParams as Record<string, unknown>]);
           messageToSend = JSON.stringify([messageType, messageId, commandName, commandParams]);
           break;
         // Response
@@ -100,7 +100,7 @@ export default abstract class OCPPRequestService {
         logger.debug(`${self.chargingStation.logPrefix()} Error: %j occurred when calling command %s with parameters: %j`, error, commandName, commandParams);
         // Build Exception
         // eslint-disable-next-line no-empty-function
-        self.chargingStation.requests[messageId] = [() => { }, () => { }, {}];
+        self.chargingStation.requests.set(messageId, [() => { }, () => { }, {}]);
         // Send error
         reject(error);
       }
index 8fedf7b4d37da4d7de00c330dcdb1df420cedbfe..60b2f681ef1464b960e26ba783688067149e7e62 100644 (file)
@@ -35,3 +35,7 @@ export enum WebSocketCloseEventStatusCode {
   CLOSE_BAD_GATEWAY = 1014,
   CLOSE_TLS_HANDSHAKE = 1015
 }
+
+export interface WSError extends Error {
+  code?: string
+}
index 89f836651c013bc81122326487ac2e4b0070cbc7..713fa0e6da50d64f42f89d4eb3f4a57f64783d20 100644 (file)
@@ -4,10 +4,6 @@ import { MessageType } from './MessageType';
 import { OCPP16DiagnosticsStatus } from './1.6/DiagnosticsStatus';
 import OCPPError from '../../charging-station/ocpp/OCPPError';
 
-export default interface Requests {
-  [id: string]: Request;
-}
-
 export type BootNotificationRequest = OCPP16BootNotificationRequest;
 
 export type AvailabilityType = OCPP16AvailabilityType;