From 136c90bad539d32137cfabd0d2d97bf0aa89be66 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 22 Nov 2020 14:58:34 +0100 Subject: [PATCH] Add WebSocketPingInterval support. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../abb-atg.station-template.json | 5 + .../schneider.station-template.json | 5 + src/charging-station/ChargingStation.ts | 150 ++++++++++++------ src/utils/Constants.ts | 2 +- src/utils/Statistics.ts | 14 +- 5 files changed, 122 insertions(+), 54 deletions(-) diff --git a/src/assets/station-templates/abb-atg.station-template.json b/src/assets/station-templates/abb-atg.station-template.json index edc4c827..9f0cebd6 100644 --- a/src/assets/station-templates/abb-atg.station-template.json +++ b/src/assets/station-templates/abb-atg.station-template.json @@ -40,6 +40,11 @@ "readonly": false, "value": "false" }, + { + "key": "WebSocketPingInterval", + "readonly": false, + "value": "60" + }, { "key": "VendorKey", "readonly": false, diff --git a/src/assets/station-templates/schneider.station-template.json b/src/assets/station-templates/schneider.station-template.json index 5583cf04..743c7fcc 100644 --- a/src/assets/station-templates/schneider.station-template.json +++ b/src/assets/station-templates/schneider.station-template.json @@ -36,6 +36,11 @@ "key": "AuthorizeRemoteTxRequests", "readonly": false, "value": "false" + }, + { + "key": "WebSocketPingInterval", + "readonly": false, + "value": "60" } ] }, diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index e509c028..85045f7e 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -5,6 +5,7 @@ import { ConfigurationResponse, DefaultRequestResponse, UnlockResponse } from '. import Connectors, { Connector } from '../types/Connectors'; import MeterValue, { MeterValueLocation, MeterValueMeasurand, MeterValuePhase, MeterValueUnit } from '../types/ocpp/1.6/MeterValue'; import { PerformanceObserver, performance } from 'perf_hooks'; +import WebSocket, { MessageEvent } from 'ws'; import AutomaticTransactionGenerator from './AutomaticTransactionGenerator'; import { ChargePointErrorCode } from '../types/ocpp/1.6/ChargePointErrorCode'; @@ -18,7 +19,6 @@ import OCPPError from './OcppError'; import Requests from '../types/ocpp/1.6/Requests'; import Statistics from '../utils/Statistics'; import Utils from '../utils/Utils'; -import WebSocket from 'ws'; import crypto from 'crypto'; import fs from 'fs'; import logger from '../utils/Logger'; @@ -51,6 +51,7 @@ export default class ChargingStation { private _authorizedTags: string[]; private _heartbeatInterval: number; private _heartbeatSetInterval: NodeJS.Timeout; + private _webSocketPingSetInterval: NodeJS.Timeout; private _statistics: Statistics; private _performanceObserver: PerformanceObserver; @@ -111,7 +112,7 @@ export default class ChargingStation { ...!Utils.isUndefined(this._stationInfo.chargeBoxSerialNumberPrefix) && { chargeBoxSerialNumber: this._stationInfo.chargeBoxSerialNumberPrefix }, ...!Utils.isUndefined(this._stationInfo.firmwareVersion) && { firmwareVersion: this._stationInfo.firmwareVersion }, }; - this._configuration = this._getConfiguration(); + this._configuration = this._getTemplateChargingStationConfiguration(); this._supervisionUrl = this._getSupervisionURL(); this._wsConnectionUrl = this._supervisionUrl + '/' + this._stationInfo.name; // Build connectors if needed @@ -184,7 +185,7 @@ export default class ChargingStation { return Utils.logPrefix(` ${this._stationInfo.name}:`); } - _getConfiguration(): ChargingStationConfiguration { + _getTemplateChargingStationConfiguration(): ChargingStationConfiguration { return this._stationInfo.Configuration ? this._stationInfo.Configuration : {} as ChargingStationConfiguration; } @@ -331,21 +332,25 @@ export default class ChargingStation { return localAuthListEnabled ? Utils.convertToBoolean(localAuthListEnabled.value) : false; } - _startMessageSequence(): void { + async _startMessageSequence(): Promise { + // Start WebSocket ping + this._startWebSocketPing(); // Start heartbeat this._startHeartbeat(); // Initialize connectors status for (const connector in this._connectors) { - if (!this.getConnector(Utils.convertToInt(connector)).transactionStarted) { - if (!this.getConnector(Utils.convertToInt(connector)).status && this.getConnector(Utils.convertToInt(connector)).bootStatus) { - this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus); - } else if (!this._hasStopped && this.getConnector(Utils.convertToInt(connector)).status) { - this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).status); - } else { - this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.AVAILABLE); - } + if (!this._hasStopped && !this.getConnector(Utils.convertToInt(connector)).status && this.getConnector(Utils.convertToInt(connector)).bootStatus) { + // Send status in template at startup + await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus); + } else if (this._hasStopped && this.getConnector(Utils.convertToInt(connector)).bootStatus) { + // Send status in template after reset + await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus); + } else if (!this._hasStopped && this.getConnector(Utils.convertToInt(connector)).status) { + // Send previous status at template reload + await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).status); } else { - this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.CHARGING); + // Send default status + await this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.AVAILABLE); } } // Start the ATG @@ -363,6 +368,8 @@ export default class ChargingStation { } async _stopMessageSequence(reason: StopTransactionReason = StopTransactionReason.NONE): Promise { + // Stop WebSocket ping + this._stopWebSocketPing(); // Stop heartbeat this._stopHeartbeat(); // Stop the ATG @@ -379,14 +386,46 @@ export default class ChargingStation { } } + _startWebSocketPing(): void { + const webSocketPingInterval: number = this._getConfigurationKey('WebSocketPingInterval') ? Utils.convertToInt(this._getConfigurationKey('WebSocketPingInterval').value) : 0; + if (webSocketPingInterval > 0 && !this._webSocketPingSetInterval) { + this._webSocketPingSetInterval = setInterval(() => { + if (this._wsConnection?.readyState === WebSocket.OPEN) { + this._wsConnection.ping((): void => {}); + } + }, webSocketPingInterval * 1000); + logger.info(this._logPrefix() + ' WebSocket ping started every ' + Utils.secondsToHHMMSS(webSocketPingInterval)); + } else if (this._webSocketPingSetInterval) { + logger.info(this._logPrefix() + ' WebSocket ping every ' + Utils.secondsToHHMMSS(webSocketPingInterval) + ' already started'); + } else { + logger.error(`${this._logPrefix()} WebSocket ping interval set to ${webSocketPingInterval ? Utils.secondsToHHMMSS(webSocketPingInterval) : webSocketPingInterval}, not starting the WebSocket ping`); + } + } + + _stopWebSocketPing(): void { + if (this._webSocketPingSetInterval) { + clearInterval(this._webSocketPingSetInterval); + this._webSocketPingSetInterval = null; + } + } + + _restartWebSocketPing(): void { + // Stop WebSocket ping + this._stopWebSocketPing(); + // Start WebSocket ping + this._startWebSocketPing(); + } + _startHeartbeat(): void { if (this._heartbeatInterval && this._heartbeatInterval > 0 && !this._heartbeatSetInterval) { - this._heartbeatSetInterval = setInterval(() => { - this.sendHeartbeat(); + this._heartbeatSetInterval = setInterval(async () => { + await this.sendHeartbeat(); }, this._heartbeatInterval); logger.info(this._logPrefix() + ' Heartbeat started every ' + Utils.milliSecondsToHHMMSS(this._heartbeatInterval)); + } else if (this._heartbeatSetInterval) { + logger.info(this._logPrefix() + ' Heartbeat every ' + Utils.milliSecondsToHHMMSS(this._heartbeatInterval) + ' already started'); } else { - logger.error(`${this._logPrefix()} Heartbeat interval set to ${Utils.milliSecondsToHHMMSS(this._heartbeatInterval)}, not starting the heartbeat`); + logger.error(`${this._logPrefix()} Heartbeat interval set to ${this._heartbeatInterval ? Utils.milliSecondsToHHMMSS(this._heartbeatInterval) : this._heartbeatInterval}, not starting the heartbeat`); } } @@ -397,6 +436,13 @@ export default class ChargingStation { } } + _restartHeartbeat(): void { + // Stop heartbeat + this._stopHeartbeat(); + // Start heartbeat + this._startHeartbeat(); + } + _startAuthorizationFileMonitoring(): void { // eslint-disable-next-line @typescript-eslint/no-unused-vars fs.watchFile(this._getAuthorizationFile(), (current, previous) => { @@ -421,6 +467,7 @@ export default class ChargingStation { this._automaticTransactionGeneration) { this._automaticTransactionGeneration.stop().catch(() => { }); } + // FIXME?: restart heartbeat and WebSocket ping when their interval values have changed } catch (error) { logger.error(this._logPrefix() + ' Charging station template file monitoring error: %j', error); } @@ -452,12 +499,13 @@ export default class ChargingStation { } } - start(): void { - if (!this._wsConnectionUrl) { - this._wsConnectionUrl = this._supervisionUrl + '/' + this._stationInfo.name; - } + _openWSConnection(): void { this._wsConnection = new WebSocket(this._wsConnectionUrl, 'ocpp' + Constants.OCPP_VERSION_16); logger.info(this._logPrefix() + ' Will communicate through URL ' + this._supervisionUrl); + } + + start(): void { + this._openWSConnection(); // Monitor authorization file this._startAuthorizationFileMonitoring(); // Monitor station template file @@ -472,16 +520,18 @@ export default class ChargingStation { this._wsConnection.on('open', this.onOpen.bind(this)); // Handle Socket ping this._wsConnection.on('ping', this.onPing.bind(this)); + // Handle Socket pong + this._wsConnection.on('pong', this.onPong.bind(this)); } async stop(reason: StopTransactionReason = StopTransactionReason.NONE): Promise { - // Stop + // Stop message sequence await this._stopMessageSequence(reason); // eslint-disable-next-line guard-for-in for (const connector in this._connectors) { await this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.UNAVAILABLE); } - if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) { + if (this._wsConnection?.readyState === WebSocket.OPEN) { this._wsConnection.close(); } this._hasStopped = true; @@ -489,6 +539,8 @@ export default class ChargingStation { _reconnect(error): void { logger.error(this._logPrefix() + ' Socket: abnormally closed: %j', error); + // Stop heartbeat + this._stopHeartbeat(); // Stop the ATG if needed if (this._stationInfo.AutomaticTransactionGenerator.enable && this._stationInfo.AutomaticTransactionGenerator.stopOnConnectionFailure && @@ -496,44 +548,42 @@ export default class ChargingStation { !this._automaticTransactionGeneration.timeToStop) { this._automaticTransactionGeneration.stop().catch(() => { }); } - // Stop heartbeat - this._stopHeartbeat(); if (this._autoReconnectTimeout !== 0 && (this._autoReconnectRetryCount < this._autoReconnectMaxRetries || this._autoReconnectMaxRetries === -1)) { logger.error(`${this._logPrefix()} Socket: connection retry with timeout ${this._autoReconnectTimeout}ms`); this._autoReconnectRetryCount++; setTimeout(() => { logger.error(this._logPrefix() + ' Socket: reconnecting try #' + this._autoReconnectRetryCount.toString()); - this.start(); + this._openWSConnection(); }, this._autoReconnectTimeout); } else if (this._autoReconnectTimeout !== 0 || this._autoReconnectMaxRetries !== -1) { logger.error(`${this._logPrefix()} Socket: max retries reached (${this._autoReconnectRetryCount}) or retry disabled (${this._autoReconnectTimeout})`); } } - onOpen(): void { + async onOpen(): Promise { logger.info(`${this._logPrefix()} Is connected to server through ${this._wsConnectionUrl}`); - if (!this._hasSocketRestarted) { + if (!this._hasSocketRestarted || this._hasStopped) { // Send BootNotification - this.sendBootNotification(); + await this.sendBootNotification(); } + await this._startMessageSequence(); if (this._hasSocketRestarted) { - this._startMessageSequence(); if (!Utils.isEmptyArray(this._messageQueue)) { this._messageQueue.forEach((message, index) => { - if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) { + if (this._wsConnection?.readyState === WebSocket.OPEN) { this._messageQueue.splice(index, 1); this._wsConnection.send(message); } }); } } - this._autoReconnectRetryCount = 0; this._hasSocketRestarted = false; + this._autoReconnectRetryCount = 0; } onError(errorEvent): void { - switch (errorEvent) { + switch (errorEvent.code) { case 'ECONNREFUSED': this._hasSocketRestarted = true; this._reconnect(errorEvent); @@ -562,11 +612,15 @@ export default class ChargingStation { logger.debug(this._logPrefix() + ' Has received a WS ping (rfc6455) from the server'); } - async onMessage(messageEvent): Promise { + onPong(): void { + logger.debug(this._logPrefix() + ' Has received a WS pong (rfc6455) from the server'); + } + + async onMessage(messageEvent: MessageEvent): Promise { let [messageType, messageId, commandName, commandPayload, errorDetails] = [0, '', Constants.ENTITY_CHARGING_STATION, '', '']; try { // Parse the message - [messageType, messageId, commandName, commandPayload, errorDetails] = JSON.parse(messageEvent); + [messageType, messageId, commandName, commandPayload, errorDetails] = JSON.parse(messageEvent.toString()); // Check the Type of message switch (messageType) { @@ -622,7 +676,7 @@ export default class ChargingStation { // Log logger.error('%s Incoming message %j processing error %s on request content type %s', this._logPrefix(), messageEvent, error, this._requests[messageId]); // Send error - await this.sendError(messageId, error, commandName); + messageType !== Constants.OCPP_JSON_CALL_ERROR_MESSAGE && await this.sendError(messageId, error, commandName); } } @@ -953,7 +1007,7 @@ export default class ChargingStation { break; } // Check if wsConnection is ready - if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) { + if (this._wsConnection?.readyState === WebSocket.OPEN) { if (this.getEnableStatistics()) { this._statistics.addMessage(commandName, messageType); } @@ -974,7 +1028,7 @@ export default class ChargingStation { this._messageQueue.push(messageToSend); } // Reject it - return rejectCallback(new OCPPError(commandParams.code ? commandParams.code : Constants.OCPP_ERROR_GENERIC_ERROR, commandParams.message ? commandParams.message : `Web socket closed for message id '${messageId}' with content '${messageToSend}', message buffered`, commandParams.details ? commandParams.details : {})); + return rejectCallback(new OCPPError(commandParams.code ? commandParams.code : Constants.OCPP_ERROR_GENERIC_ERROR, commandParams.message ? commandParams.message : `WebSocket closed for message id '${messageId}' with content '${messageToSend}', message buffered`, commandParams.details ? commandParams.details : {})); } // Response? if (messageType === Constants.OCPP_JSON_CALL_RESULT_MESSAGE) { @@ -1021,10 +1075,10 @@ export default class ChargingStation { handleResponseBootNotification(payload, requestPayload): void { if (payload.status === 'Accepted') { - this._heartbeatInterval = payload.interval * 1000; + this._heartbeatInterval = Utils.convertToInt(payload.interval) * 1000; + this._heartbeatSetInterval ? this._restartHeartbeat() : this._startHeartbeat(); this._addConfigurationKey('HeartBeatInterval', payload.interval); this._addConfigurationKey('HeartbeatInterval', payload.interval, false, false); - this._startMessageSequence(); this._hasStopped && (this._hasStopped = false); } else if (payload.status === 'Pending') { logger.info(this._logPrefix() + ' Charging station in pending state on the central server'); @@ -1248,22 +1302,26 @@ export default class ChargingStation { return Constants.OCPP_CONFIGURATION_RESPONSE_REJECTED; } else if (keyToChange && !keyToChange.readonly) { const keyIndex = this._configuration.configurationKey.indexOf(keyToChange); - this._configuration.configurationKey[keyIndex].value = commandPayload.value; + let valueChanged = false; + if (this._configuration.configurationKey[keyIndex].value !== commandPayload.value) { + this._configuration.configurationKey[keyIndex].value = commandPayload.value as string; + valueChanged = true; + } let triggerHeartbeatRestart = false; - if (keyToChange.key === 'HeartBeatInterval') { + if (keyToChange.key === 'HeartBeatInterval' && valueChanged) { this._setConfigurationKeyValue('HeartbeatInterval', commandPayload.value); triggerHeartbeatRestart = true; } - if (keyToChange.key === 'HeartbeatInterval') { + if (keyToChange.key === 'HeartbeatInterval' && valueChanged) { this._setConfigurationKeyValue('HeartBeatInterval', commandPayload.value); triggerHeartbeatRestart = true; } if (triggerHeartbeatRestart) { this._heartbeatInterval = Utils.convertToInt(commandPayload.value) * 1000; - // Stop heartbeat - this._stopHeartbeat(); - // Start heartbeat - this._startHeartbeat(); + this._restartHeartbeat(); + } + if (keyToChange.key === 'WebSocketPingInterval' && valueChanged) { + this._restartWebSocketPing(); } if (keyToChange.reboot) { return Constants.OCPP_CONFIGURATION_RESPONSE_REBOOT_REQUIRED; diff --git a/src/utils/Constants.ts b/src/utils/Constants.ts index cf5d9bfa..56652b37 100644 --- a/src/utils/Constants.ts +++ b/src/utils/Constants.ts @@ -50,7 +50,7 @@ export default class Constants { static readonly CHARGING_STATION_DEFAULT_RESET_TIME = 60000; // Ms static readonly CHARGING_STATION_ATG_WAIT_TIME = 2000; // Ms - static readonly MAXIMUM_MEASUREMENTS_NUMBER = 5000; + static readonly MAXIMUM_MEASUREMENTS_NUMBER = 2000; static readonly TRANSACTION_DEFAULT_IDTAG = '00000000'; } diff --git a/src/utils/Statistics.ts b/src/utils/Statistics.ts index d687368c..a7dfaaa7 100644 --- a/src/utils/Statistics.ts +++ b/src/utils/Statistics.ts @@ -74,14 +74,18 @@ export default class Statistics { perfEntry.entryType = entry.entryType; perfEntry.startTime = entry.startTime; perfEntry.duration = entry.duration; - logger.info(`${this._logPrefix()} object ${className} method performance entry: %j`, perfEntry); + logger.info(`${this._logPrefix()} object ${className} method(s) performance entry: %j`, perfEntry); } - _display(): void { + start(): void { + this._displayInterval(); + } + + private _display(): void { logger.info(this._logPrefix() + ' %j', this._commandsStatistics); } - _displayInterval(): void { + private _displayInterval(): void { if (Configuration.getStatisticsDisplayInterval() > 0) { setInterval(() => { this._display(); @@ -90,10 +94,6 @@ export default class Statistics { } } - start(): void { - this._displayInterval(); - } - private median(dataSet: number[]): number { if (Array.isArray(dataSet) && dataSet.length === 1) { return dataSet[0]; -- 2.34.1