Add WebSocketPingInterval support.
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 22 Nov 2020 13:58:34 +0000 (14:58 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 22 Nov 2020 13:58:34 +0000 (14:58 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/assets/station-templates/abb-atg.station-template.json
src/assets/station-templates/schneider.station-template.json
src/charging-station/ChargingStation.ts
src/utils/Constants.ts
src/utils/Statistics.ts

index edc4c82797b9948f45126cd4b72dda0e7d321a41..9f0cebd6c88ae061b6105d70f6a37fca7b55cba1 100644 (file)
         "readonly": false,
         "value": "false"
       },
+      {
+        "key": "WebSocketPingInterval",
+        "readonly": false,
+        "value": "60"
+      },
       {
         "key": "VendorKey",
         "readonly": false,
index 5583cf04e1f91428455680b756a9fd2800f99ca5..743c7fcc5a10cfcf4ca98fe74028cf177094568e 100644 (file)
         "key": "AuthorizeRemoteTxRequests",
         "readonly": false,
         "value": "false"
+      },
+      {
+        "key": "WebSocketPingInterval",
+        "readonly": false,
+        "value": "60"
       }
     ]
   },
index e509c0288118e0b59c7380a4e78f8680e0af6fc6..85045f7eb0aec2b391f9f6d041cdfe6ee95b8174 100644 (file)
@@ -5,6 +5,7 @@ import { ConfigurationResponse, DefaultRequestResponse, UnlockResponse } from '.
 import Connectors, { Connector } from '../types/Connectors';
 import MeterValue, { MeterValueLocation, MeterValueMeasurand, MeterValuePhase, MeterValueUnit } from '../types/ocpp/1.6/MeterValue';
 import { PerformanceObserver, performance } from 'perf_hooks';
+import WebSocket, { MessageEvent } from 'ws';
 
 import AutomaticTransactionGenerator from './AutomaticTransactionGenerator';
 import { ChargePointErrorCode } from '../types/ocpp/1.6/ChargePointErrorCode';
@@ -18,7 +19,6 @@ import OCPPError from './OcppError';
 import Requests from '../types/ocpp/1.6/Requests';
 import Statistics from '../utils/Statistics';
 import Utils from '../utils/Utils';
-import WebSocket from 'ws';
 import crypto from 'crypto';
 import fs from 'fs';
 import logger from '../utils/Logger';
@@ -51,6 +51,7 @@ export default class ChargingStation {
   private _authorizedTags: string[];
   private _heartbeatInterval: number;
   private _heartbeatSetInterval: NodeJS.Timeout;
+  private _webSocketPingSetInterval: NodeJS.Timeout;
   private _statistics: Statistics;
   private _performanceObserver: PerformanceObserver;
 
@@ -111,7 +112,7 @@ export default class ChargingStation {
       ...!Utils.isUndefined(this._stationInfo.chargeBoxSerialNumberPrefix) && { chargeBoxSerialNumber: this._stationInfo.chargeBoxSerialNumberPrefix },
       ...!Utils.isUndefined(this._stationInfo.firmwareVersion) && { firmwareVersion: this._stationInfo.firmwareVersion },
     };
-    this._configuration = this._getConfiguration();
+    this._configuration = this._getTemplateChargingStationConfiguration();
     this._supervisionUrl = this._getSupervisionURL();
     this._wsConnectionUrl = this._supervisionUrl + '/' + this._stationInfo.name;
     // Build connectors if needed
@@ -184,7 +185,7 @@ export default class ChargingStation {
     return Utils.logPrefix(` ${this._stationInfo.name}:`);
   }
 
-  _getConfiguration(): ChargingStationConfiguration {
+  _getTemplateChargingStationConfiguration(): ChargingStationConfiguration {
     return this._stationInfo.Configuration ? this._stationInfo.Configuration : {} as ChargingStationConfiguration;
   }
 
@@ -331,21 +332,25 @@ export default class ChargingStation {
     return localAuthListEnabled ? Utils.convertToBoolean(localAuthListEnabled.value) : false;
   }
 
-  _startMessageSequence(): void {
+  async _startMessageSequence(): Promise<void> {
+    // Start WebSocket ping
+    this._startWebSocketPing();
     // Start heartbeat
     this._startHeartbeat();
     // Initialize connectors status
     for (const connector in this._connectors) {
-      if (!this.getConnector(Utils.convertToInt(connector)).transactionStarted) {
-        if (!this.getConnector(Utils.convertToInt(connector)).status && this.getConnector(Utils.convertToInt(connector)).bootStatus) {
-          this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus);
-        } else if (!this._hasStopped && this.getConnector(Utils.convertToInt(connector)).status) {
-          this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).status);
-        } else {
-          this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.AVAILABLE);
-        }
+      if (!this._hasStopped && !this.getConnector(Utils.convertToInt(connector)).status && this.getConnector(Utils.convertToInt(connector)).bootStatus) {
+        // Send status in template at startup
+        await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus);
+      } else if (this._hasStopped && this.getConnector(Utils.convertToInt(connector)).bootStatus) {
+        // Send status in template after reset
+        await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus);
+      } else if (!this._hasStopped && this.getConnector(Utils.convertToInt(connector)).status) {
+        // Send previous status at template reload
+        await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).status);
       } else {
-        this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.CHARGING);
+        // Send default status
+        await this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.AVAILABLE);
       }
     }
     // Start the ATG
@@ -363,6 +368,8 @@ export default class ChargingStation {
   }
 
   async _stopMessageSequence(reason: StopTransactionReason = StopTransactionReason.NONE): Promise<void> {
+    // Stop WebSocket ping
+    this._stopWebSocketPing();
     // Stop heartbeat
     this._stopHeartbeat();
     // Stop the ATG
@@ -379,14 +386,46 @@ export default class ChargingStation {
     }
   }
 
+  _startWebSocketPing(): void {
+    const webSocketPingInterval: number = this._getConfigurationKey('WebSocketPingInterval') ? Utils.convertToInt(this._getConfigurationKey('WebSocketPingInterval').value) : 0;
+    if (webSocketPingInterval > 0 && !this._webSocketPingSetInterval) {
+      this._webSocketPingSetInterval = setInterval(() => {
+        if (this._wsConnection?.readyState === WebSocket.OPEN) {
+          this._wsConnection.ping((): void => {});
+        }
+      }, webSocketPingInterval * 1000);
+      logger.info(this._logPrefix() + ' WebSocket ping started every ' + Utils.secondsToHHMMSS(webSocketPingInterval));
+    } else if (this._webSocketPingSetInterval) {
+      logger.info(this._logPrefix() + ' WebSocket ping every ' + Utils.secondsToHHMMSS(webSocketPingInterval) + ' already started');
+    } else {
+      logger.error(`${this._logPrefix()} WebSocket ping interval set to ${webSocketPingInterval ? Utils.secondsToHHMMSS(webSocketPingInterval) : webSocketPingInterval}, not starting the WebSocket ping`);
+    }
+  }
+
+  _stopWebSocketPing(): void {
+    if (this._webSocketPingSetInterval) {
+      clearInterval(this._webSocketPingSetInterval);
+      this._webSocketPingSetInterval = null;
+    }
+  }
+
+  _restartWebSocketPing(): void {
+    // Stop WebSocket ping
+    this._stopWebSocketPing();
+    // Start WebSocket ping
+    this._startWebSocketPing();
+  }
+
   _startHeartbeat(): void {
     if (this._heartbeatInterval && this._heartbeatInterval > 0 && !this._heartbeatSetInterval) {
-      this._heartbeatSetInterval = setInterval(() => {
-        this.sendHeartbeat();
+      this._heartbeatSetInterval = setInterval(async () => {
+        await this.sendHeartbeat();
       }, this._heartbeatInterval);
       logger.info(this._logPrefix() + ' Heartbeat started every ' + Utils.milliSecondsToHHMMSS(this._heartbeatInterval));
+    } else if (this._heartbeatSetInterval) {
+      logger.info(this._logPrefix() + ' Heartbeat every ' + Utils.milliSecondsToHHMMSS(this._heartbeatInterval) + ' already started');
     } else {
-      logger.error(`${this._logPrefix()} Heartbeat interval set to ${Utils.milliSecondsToHHMMSS(this._heartbeatInterval)}, not starting the heartbeat`);
+      logger.error(`${this._logPrefix()} Heartbeat interval set to ${this._heartbeatInterval ? Utils.milliSecondsToHHMMSS(this._heartbeatInterval) : this._heartbeatInterval}, not starting the heartbeat`);
     }
   }
 
@@ -397,6 +436,13 @@ export default class ChargingStation {
     }
   }
 
+  _restartHeartbeat(): void {
+    // Stop heartbeat
+    this._stopHeartbeat();
+    // Start heartbeat
+    this._startHeartbeat();
+  }
+
   _startAuthorizationFileMonitoring(): void {
     // eslint-disable-next-line @typescript-eslint/no-unused-vars
     fs.watchFile(this._getAuthorizationFile(), (current, previous) => {
@@ -421,6 +467,7 @@ export default class ChargingStation {
           this._automaticTransactionGeneration) {
           this._automaticTransactionGeneration.stop().catch(() => { });
         }
+        // FIXME?: restart heartbeat and WebSocket ping when their interval values have changed
       } catch (error) {
         logger.error(this._logPrefix() + ' Charging station template file monitoring error: %j', error);
       }
@@ -452,12 +499,13 @@ export default class ChargingStation {
     }
   }
 
-  start(): void {
-    if (!this._wsConnectionUrl) {
-      this._wsConnectionUrl = this._supervisionUrl + '/' + this._stationInfo.name;
-    }
+  _openWSConnection(): void {
     this._wsConnection = new WebSocket(this._wsConnectionUrl, 'ocpp' + Constants.OCPP_VERSION_16);
     logger.info(this._logPrefix() + ' Will communicate through URL ' + this._supervisionUrl);
+  }
+
+  start(): void {
+    this._openWSConnection();
     // Monitor authorization file
     this._startAuthorizationFileMonitoring();
     // Monitor station template file
@@ -472,16 +520,18 @@ export default class ChargingStation {
     this._wsConnection.on('open', this.onOpen.bind(this));
     // Handle Socket ping
     this._wsConnection.on('ping', this.onPing.bind(this));
+    // Handle Socket pong
+    this._wsConnection.on('pong', this.onPong.bind(this));
   }
 
   async stop(reason: StopTransactionReason = StopTransactionReason.NONE): Promise<void> {
-    // Stop
+    // Stop message sequence
     await this._stopMessageSequence(reason);
     // eslint-disable-next-line guard-for-in
     for (const connector in this._connectors) {
       await this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.UNAVAILABLE);
     }
-    if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) {
+    if (this._wsConnection?.readyState === WebSocket.OPEN) {
       this._wsConnection.close();
     }
     this._hasStopped = true;
@@ -489,6 +539,8 @@ export default class ChargingStation {
 
   _reconnect(error): void {
     logger.error(this._logPrefix() + ' Socket: abnormally closed: %j', error);
+    // Stop heartbeat
+    this._stopHeartbeat();
     // Stop the ATG if needed
     if (this._stationInfo.AutomaticTransactionGenerator.enable &&
       this._stationInfo.AutomaticTransactionGenerator.stopOnConnectionFailure &&
@@ -496,44 +548,42 @@ export default class ChargingStation {
       !this._automaticTransactionGeneration.timeToStop) {
       this._automaticTransactionGeneration.stop().catch(() => { });
     }
-    // Stop heartbeat
-    this._stopHeartbeat();
     if (this._autoReconnectTimeout !== 0 &&
       (this._autoReconnectRetryCount < this._autoReconnectMaxRetries || this._autoReconnectMaxRetries === -1)) {
       logger.error(`${this._logPrefix()} Socket: connection retry with timeout ${this._autoReconnectTimeout}ms`);
       this._autoReconnectRetryCount++;
       setTimeout(() => {
         logger.error(this._logPrefix() + ' Socket: reconnecting try #' + this._autoReconnectRetryCount.toString());
-        this.start();
+        this._openWSConnection();
       }, this._autoReconnectTimeout);
     } else if (this._autoReconnectTimeout !== 0 || this._autoReconnectMaxRetries !== -1) {
       logger.error(`${this._logPrefix()} Socket: max retries reached (${this._autoReconnectRetryCount}) or retry disabled (${this._autoReconnectTimeout})`);
     }
   }
 
-  onOpen(): void {
+  async onOpen(): Promise<void> {
     logger.info(`${this._logPrefix()} Is connected to server through ${this._wsConnectionUrl}`);
-    if (!this._hasSocketRestarted) {
+    if (!this._hasSocketRestarted || this._hasStopped) {
       // Send BootNotification
-      this.sendBootNotification();
+      await this.sendBootNotification();
     }
+    await this._startMessageSequence();
     if (this._hasSocketRestarted) {
-      this._startMessageSequence();
       if (!Utils.isEmptyArray(this._messageQueue)) {
         this._messageQueue.forEach((message, index) => {
-          if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) {
+          if (this._wsConnection?.readyState === WebSocket.OPEN) {
             this._messageQueue.splice(index, 1);
             this._wsConnection.send(message);
           }
         });
       }
     }
-    this._autoReconnectRetryCount = 0;
     this._hasSocketRestarted = false;
+    this._autoReconnectRetryCount = 0;
   }
 
   onError(errorEvent): void {
-    switch (errorEvent) {
+    switch (errorEvent.code) {
       case 'ECONNREFUSED':
         this._hasSocketRestarted = true;
         this._reconnect(errorEvent);
@@ -562,11 +612,15 @@ export default class ChargingStation {
     logger.debug(this._logPrefix() + ' Has received a WS ping (rfc6455) from the server');
   }
 
-  async onMessage(messageEvent): Promise<void> {
+  onPong(): void {
+    logger.debug(this._logPrefix() + ' Has received a WS pong (rfc6455) from the server');
+  }
+
+  async onMessage(messageEvent: MessageEvent): Promise<void> {
     let [messageType, messageId, commandName, commandPayload, errorDetails] = [0, '', Constants.ENTITY_CHARGING_STATION, '', ''];
     try {
       // Parse the message
-      [messageType, messageId, commandName, commandPayload, errorDetails] = JSON.parse(messageEvent);
+      [messageType, messageId, commandName, commandPayload, errorDetails] = JSON.parse(messageEvent.toString());
 
       // Check the Type of message
       switch (messageType) {
@@ -622,7 +676,7 @@ export default class ChargingStation {
       // Log
       logger.error('%s Incoming message %j processing error %s on request content type %s', this._logPrefix(), messageEvent, error, this._requests[messageId]);
       // Send error
-      await this.sendError(messageId, error, commandName);
+      messageType !== Constants.OCPP_JSON_CALL_ERROR_MESSAGE && await this.sendError(messageId, error, commandName);
     }
   }
 
@@ -953,7 +1007,7 @@ export default class ChargingStation {
           break;
       }
       // Check if wsConnection is ready
-      if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) {
+      if (this._wsConnection?.readyState === WebSocket.OPEN) {
         if (this.getEnableStatistics()) {
           this._statistics.addMessage(commandName, messageType);
         }
@@ -974,7 +1028,7 @@ export default class ChargingStation {
           this._messageQueue.push(messageToSend);
         }
         // Reject it
-        return rejectCallback(new OCPPError(commandParams.code ? commandParams.code : Constants.OCPP_ERROR_GENERIC_ERROR, commandParams.message ? commandParams.message : `Web socket closed for message id '${messageId}' with content '${messageToSend}', message buffered`, commandParams.details ? commandParams.details : {}));
+        return rejectCallback(new OCPPError(commandParams.code ? commandParams.code : Constants.OCPP_ERROR_GENERIC_ERROR, commandParams.message ? commandParams.message : `WebSocket closed for message id '${messageId}' with content '${messageToSend}', message buffered`, commandParams.details ? commandParams.details : {}));
       }
       // Response?
       if (messageType === Constants.OCPP_JSON_CALL_RESULT_MESSAGE) {
@@ -1021,10 +1075,10 @@ export default class ChargingStation {
 
   handleResponseBootNotification(payload, requestPayload): void {
     if (payload.status === 'Accepted') {
-      this._heartbeatInterval = payload.interval * 1000;
+      this._heartbeatInterval = Utils.convertToInt(payload.interval) * 1000;
+      this._heartbeatSetInterval ? this._restartHeartbeat() : this._startHeartbeat();
       this._addConfigurationKey('HeartBeatInterval', payload.interval);
       this._addConfigurationKey('HeartbeatInterval', payload.interval, false, false);
-      this._startMessageSequence();
       this._hasStopped && (this._hasStopped = false);
     } else if (payload.status === 'Pending') {
       logger.info(this._logPrefix() + ' Charging station in pending state on the central server');
@@ -1248,22 +1302,26 @@ export default class ChargingStation {
       return Constants.OCPP_CONFIGURATION_RESPONSE_REJECTED;
     } else if (keyToChange && !keyToChange.readonly) {
       const keyIndex = this._configuration.configurationKey.indexOf(keyToChange);
-      this._configuration.configurationKey[keyIndex].value = commandPayload.value;
+      let valueChanged = false;
+      if (this._configuration.configurationKey[keyIndex].value !== commandPayload.value) {
+        this._configuration.configurationKey[keyIndex].value = commandPayload.value as string;
+        valueChanged = true;
+      }
       let triggerHeartbeatRestart = false;
-      if (keyToChange.key === 'HeartBeatInterval') {
+      if (keyToChange.key === 'HeartBeatInterval' && valueChanged) {
         this._setConfigurationKeyValue('HeartbeatInterval', commandPayload.value);
         triggerHeartbeatRestart = true;
       }
-      if (keyToChange.key === 'HeartbeatInterval') {
+      if (keyToChange.key === 'HeartbeatInterval' && valueChanged) {
         this._setConfigurationKeyValue('HeartBeatInterval', commandPayload.value);
         triggerHeartbeatRestart = true;
       }
       if (triggerHeartbeatRestart) {
         this._heartbeatInterval = Utils.convertToInt(commandPayload.value) * 1000;
-        // Stop heartbeat
-        this._stopHeartbeat();
-        // Start heartbeat
-        this._startHeartbeat();
+        this._restartHeartbeat();
+      }
+      if (keyToChange.key === 'WebSocketPingInterval' && valueChanged) {
+        this._restartWebSocketPing();
       }
       if (keyToChange.reboot) {
         return Constants.OCPP_CONFIGURATION_RESPONSE_REBOOT_REQUIRED;
index cf5d9bfaa38d303f2f512e5c348be6bd30dab6f0..56652b374e5ff0c301218c1eea7cb8d683274c92 100644 (file)
@@ -50,7 +50,7 @@ export default class Constants {
   static readonly CHARGING_STATION_DEFAULT_RESET_TIME = 60000; // Ms
   static readonly CHARGING_STATION_ATG_WAIT_TIME = 2000; // Ms
 
-  static readonly MAXIMUM_MEASUREMENTS_NUMBER = 5000;
+  static readonly MAXIMUM_MEASUREMENTS_NUMBER = 2000;
 
   static readonly TRANSACTION_DEFAULT_IDTAG = '00000000';
 }
index d687368c008b8043d80ec9638b13d00b0d7365d9..a7dfaaa7e4e2e0cd612bd643a3d7101345564a2e 100644 (file)
@@ -74,14 +74,18 @@ export default class Statistics {
     perfEntry.entryType = entry.entryType;
     perfEntry.startTime = entry.startTime;
     perfEntry.duration = entry.duration;
-    logger.info(`${this._logPrefix()} object ${className} method performance entry: %j`, perfEntry);
+    logger.info(`${this._logPrefix()} object ${className} method(s) performance entry: %j`, perfEntry);
   }
 
-  _display(): void {
+  start(): void {
+    this._displayInterval();
+  }
+
+  private _display(): void {
     logger.info(this._logPrefix() + ' %j', this._commandsStatistics);
   }
 
-  _displayInterval(): void {
+  private _displayInterval(): void {
     if (Configuration.getStatisticsDisplayInterval() > 0) {
       setInterval(() => {
         this._display();
@@ -90,10 +94,6 @@ export default class Statistics {
     }
   }
 
-  start(): void {
-    this._displayInterval();
-  }
-
   private median(dataSet: number[]): number {
     if (Array.isArray(dataSet) && dataSet.length === 1) {
       return dataSet[0];