import Connectors, { Connector } from '../types/Connectors';
import MeterValue, { MeterValueLocation, MeterValueMeasurand, MeterValuePhase, MeterValueUnit } from '../types/ocpp/1.6/MeterValue';
import { PerformanceObserver, performance } from 'perf_hooks';
+import WebSocket, { MessageEvent } from 'ws';
import AutomaticTransactionGenerator from './AutomaticTransactionGenerator';
import { ChargePointErrorCode } from '../types/ocpp/1.6/ChargePointErrorCode';
import Requests from '../types/ocpp/1.6/Requests';
import Statistics from '../utils/Statistics';
import Utils from '../utils/Utils';
-import WebSocket from 'ws';
import crypto from 'crypto';
import fs from 'fs';
import logger from '../utils/Logger';
private _authorizedTags: string[];
private _heartbeatInterval: number;
private _heartbeatSetInterval: NodeJS.Timeout;
+ private _webSocketPingSetInterval: NodeJS.Timeout;
private _statistics: Statistics;
private _performanceObserver: PerformanceObserver;
...!Utils.isUndefined(this._stationInfo.chargeBoxSerialNumberPrefix) && { chargeBoxSerialNumber: this._stationInfo.chargeBoxSerialNumberPrefix },
...!Utils.isUndefined(this._stationInfo.firmwareVersion) && { firmwareVersion: this._stationInfo.firmwareVersion },
};
- this._configuration = this._getConfiguration();
+ this._configuration = this._getTemplateChargingStationConfiguration();
this._supervisionUrl = this._getSupervisionURL();
this._wsConnectionUrl = this._supervisionUrl + '/' + this._stationInfo.name;
// Build connectors if needed
return Utils.logPrefix(` ${this._stationInfo.name}:`);
}
- _getConfiguration(): ChargingStationConfiguration {
+ _getTemplateChargingStationConfiguration(): ChargingStationConfiguration {
return this._stationInfo.Configuration ? this._stationInfo.Configuration : {} as ChargingStationConfiguration;
}
return localAuthListEnabled ? Utils.convertToBoolean(localAuthListEnabled.value) : false;
}
- _startMessageSequence(): void {
+ async _startMessageSequence(): Promise<void> {
+ // Start WebSocket ping
+ this._startWebSocketPing();
// Start heartbeat
this._startHeartbeat();
// Initialize connectors status
for (const connector in this._connectors) {
- if (!this.getConnector(Utils.convertToInt(connector)).transactionStarted) {
- if (!this.getConnector(Utils.convertToInt(connector)).status && this.getConnector(Utils.convertToInt(connector)).bootStatus) {
- this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus);
- } else if (!this._hasStopped && this.getConnector(Utils.convertToInt(connector)).status) {
- this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).status);
- } else {
- this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.AVAILABLE);
- }
+ if (!this._hasStopped && !this.getConnector(Utils.convertToInt(connector)).status && this.getConnector(Utils.convertToInt(connector)).bootStatus) {
+ // Send status in template at startup
+ await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus);
+ } else if (this._hasStopped && this.getConnector(Utils.convertToInt(connector)).bootStatus) {
+ // Send status in template after reset
+ await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).bootStatus);
+ } else if (!this._hasStopped && this.getConnector(Utils.convertToInt(connector)).status) {
+ // Send previous status at template reload
+ await this.sendStatusNotification(Utils.convertToInt(connector), this.getConnector(Utils.convertToInt(connector)).status);
} else {
- this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.CHARGING);
+ // Send default status
+ await this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.AVAILABLE);
}
}
// Start the ATG
}
async _stopMessageSequence(reason: StopTransactionReason = StopTransactionReason.NONE): Promise<void> {
+ // Stop WebSocket ping
+ this._stopWebSocketPing();
// Stop heartbeat
this._stopHeartbeat();
// Stop the ATG
}
}
+ _startWebSocketPing(): void {
+ const webSocketPingInterval: number = this._getConfigurationKey('WebSocketPingInterval') ? Utils.convertToInt(this._getConfigurationKey('WebSocketPingInterval').value) : 0;
+ if (webSocketPingInterval > 0 && !this._webSocketPingSetInterval) {
+ this._webSocketPingSetInterval = setInterval(() => {
+ if (this._wsConnection?.readyState === WebSocket.OPEN) {
+ this._wsConnection.ping((): void => {});
+ }
+ }, webSocketPingInterval * 1000);
+ logger.info(this._logPrefix() + ' WebSocket ping started every ' + Utils.secondsToHHMMSS(webSocketPingInterval));
+ } else if (this._webSocketPingSetInterval) {
+ logger.info(this._logPrefix() + ' WebSocket ping every ' + Utils.secondsToHHMMSS(webSocketPingInterval) + ' already started');
+ } else {
+ logger.error(`${this._logPrefix()} WebSocket ping interval set to ${webSocketPingInterval ? Utils.secondsToHHMMSS(webSocketPingInterval) : webSocketPingInterval}, not starting the WebSocket ping`);
+ }
+ }
+
+ _stopWebSocketPing(): void {
+ if (this._webSocketPingSetInterval) {
+ clearInterval(this._webSocketPingSetInterval);
+ this._webSocketPingSetInterval = null;
+ }
+ }
+
+ _restartWebSocketPing(): void {
+ // Stop WebSocket ping
+ this._stopWebSocketPing();
+ // Start WebSocket ping
+ this._startWebSocketPing();
+ }
+
_startHeartbeat(): void {
if (this._heartbeatInterval && this._heartbeatInterval > 0 && !this._heartbeatSetInterval) {
- this._heartbeatSetInterval = setInterval(() => {
- this.sendHeartbeat();
+ this._heartbeatSetInterval = setInterval(async () => {
+ await this.sendHeartbeat();
}, this._heartbeatInterval);
logger.info(this._logPrefix() + ' Heartbeat started every ' + Utils.milliSecondsToHHMMSS(this._heartbeatInterval));
+ } else if (this._heartbeatSetInterval) {
+ logger.info(this._logPrefix() + ' Heartbeat every ' + Utils.milliSecondsToHHMMSS(this._heartbeatInterval) + ' already started');
} else {
- logger.error(`${this._logPrefix()} Heartbeat interval set to ${Utils.milliSecondsToHHMMSS(this._heartbeatInterval)}, not starting the heartbeat`);
+ logger.error(`${this._logPrefix()} Heartbeat interval set to ${this._heartbeatInterval ? Utils.milliSecondsToHHMMSS(this._heartbeatInterval) : this._heartbeatInterval}, not starting the heartbeat`);
}
}
}
}
+ _restartHeartbeat(): void {
+ // Stop heartbeat
+ this._stopHeartbeat();
+ // Start heartbeat
+ this._startHeartbeat();
+ }
+
_startAuthorizationFileMonitoring(): void {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
fs.watchFile(this._getAuthorizationFile(), (current, previous) => {
this._automaticTransactionGeneration) {
this._automaticTransactionGeneration.stop().catch(() => { });
}
+ // FIXME?: restart heartbeat and WebSocket ping when their interval values have changed
} catch (error) {
logger.error(this._logPrefix() + ' Charging station template file monitoring error: %j', error);
}
}
}
- start(): void {
- if (!this._wsConnectionUrl) {
- this._wsConnectionUrl = this._supervisionUrl + '/' + this._stationInfo.name;
- }
+ _openWSConnection(): void {
this._wsConnection = new WebSocket(this._wsConnectionUrl, 'ocpp' + Constants.OCPP_VERSION_16);
logger.info(this._logPrefix() + ' Will communicate through URL ' + this._supervisionUrl);
+ }
+
+ start(): void {
+ this._openWSConnection();
// Monitor authorization file
this._startAuthorizationFileMonitoring();
// Monitor station template file
this._wsConnection.on('open', this.onOpen.bind(this));
// Handle Socket ping
this._wsConnection.on('ping', this.onPing.bind(this));
+ // Handle Socket pong
+ this._wsConnection.on('pong', this.onPong.bind(this));
}
async stop(reason: StopTransactionReason = StopTransactionReason.NONE): Promise<void> {
- // Stop
+ // Stop message sequence
await this._stopMessageSequence(reason);
// eslint-disable-next-line guard-for-in
for (const connector in this._connectors) {
await this.sendStatusNotification(Utils.convertToInt(connector), ChargePointStatus.UNAVAILABLE);
}
- if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) {
+ if (this._wsConnection?.readyState === WebSocket.OPEN) {
this._wsConnection.close();
}
this._hasStopped = true;
_reconnect(error): void {
logger.error(this._logPrefix() + ' Socket: abnormally closed: %j', error);
+ // Stop heartbeat
+ this._stopHeartbeat();
// Stop the ATG if needed
if (this._stationInfo.AutomaticTransactionGenerator.enable &&
this._stationInfo.AutomaticTransactionGenerator.stopOnConnectionFailure &&
!this._automaticTransactionGeneration.timeToStop) {
this._automaticTransactionGeneration.stop().catch(() => { });
}
- // Stop heartbeat
- this._stopHeartbeat();
if (this._autoReconnectTimeout !== 0 &&
(this._autoReconnectRetryCount < this._autoReconnectMaxRetries || this._autoReconnectMaxRetries === -1)) {
logger.error(`${this._logPrefix()} Socket: connection retry with timeout ${this._autoReconnectTimeout}ms`);
this._autoReconnectRetryCount++;
setTimeout(() => {
logger.error(this._logPrefix() + ' Socket: reconnecting try #' + this._autoReconnectRetryCount.toString());
- this.start();
+ this._openWSConnection();
}, this._autoReconnectTimeout);
} else if (this._autoReconnectTimeout !== 0 || this._autoReconnectMaxRetries !== -1) {
logger.error(`${this._logPrefix()} Socket: max retries reached (${this._autoReconnectRetryCount}) or retry disabled (${this._autoReconnectTimeout})`);
}
}
- onOpen(): void {
+ async onOpen(): Promise<void> {
logger.info(`${this._logPrefix()} Is connected to server through ${this._wsConnectionUrl}`);
- if (!this._hasSocketRestarted) {
+ if (!this._hasSocketRestarted || this._hasStopped) {
// Send BootNotification
- this.sendBootNotification();
+ await this.sendBootNotification();
}
+ await this._startMessageSequence();
if (this._hasSocketRestarted) {
- this._startMessageSequence();
if (!Utils.isEmptyArray(this._messageQueue)) {
this._messageQueue.forEach((message, index) => {
- if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) {
+ if (this._wsConnection?.readyState === WebSocket.OPEN) {
this._messageQueue.splice(index, 1);
this._wsConnection.send(message);
}
});
}
}
- this._autoReconnectRetryCount = 0;
this._hasSocketRestarted = false;
+ this._autoReconnectRetryCount = 0;
}
onError(errorEvent): void {
- switch (errorEvent) {
+ switch (errorEvent.code) {
case 'ECONNREFUSED':
this._hasSocketRestarted = true;
this._reconnect(errorEvent);
logger.debug(this._logPrefix() + ' Has received a WS ping (rfc6455) from the server');
}
- async onMessage(messageEvent): Promise<void> {
+ onPong(): void {
+ logger.debug(this._logPrefix() + ' Has received a WS pong (rfc6455) from the server');
+ }
+
+ async onMessage(messageEvent: MessageEvent): Promise<void> {
let [messageType, messageId, commandName, commandPayload, errorDetails] = [0, '', Constants.ENTITY_CHARGING_STATION, '', ''];
try {
// Parse the message
- [messageType, messageId, commandName, commandPayload, errorDetails] = JSON.parse(messageEvent);
+ [messageType, messageId, commandName, commandPayload, errorDetails] = JSON.parse(messageEvent.toString());
// Check the Type of message
switch (messageType) {
// Log
logger.error('%s Incoming message %j processing error %s on request content type %s', this._logPrefix(), messageEvent, error, this._requests[messageId]);
// Send error
- await this.sendError(messageId, error, commandName);
+ messageType !== Constants.OCPP_JSON_CALL_ERROR_MESSAGE && await this.sendError(messageId, error, commandName);
}
}
break;
}
// Check if wsConnection is ready
- if (this._wsConnection && this._wsConnection.readyState === WebSocket.OPEN) {
+ if (this._wsConnection?.readyState === WebSocket.OPEN) {
if (this.getEnableStatistics()) {
this._statistics.addMessage(commandName, messageType);
}
this._messageQueue.push(messageToSend);
}
// Reject it
- return rejectCallback(new OCPPError(commandParams.code ? commandParams.code : Constants.OCPP_ERROR_GENERIC_ERROR, commandParams.message ? commandParams.message : `Web socket closed for message id '${messageId}' with content '${messageToSend}', message buffered`, commandParams.details ? commandParams.details : {}));
+ return rejectCallback(new OCPPError(commandParams.code ? commandParams.code : Constants.OCPP_ERROR_GENERIC_ERROR, commandParams.message ? commandParams.message : `WebSocket closed for message id '${messageId}' with content '${messageToSend}', message buffered`, commandParams.details ? commandParams.details : {}));
}
// Response?
if (messageType === Constants.OCPP_JSON_CALL_RESULT_MESSAGE) {
handleResponseBootNotification(payload, requestPayload): void {
if (payload.status === 'Accepted') {
- this._heartbeatInterval = payload.interval * 1000;
+ this._heartbeatInterval = Utils.convertToInt(payload.interval) * 1000;
+ this._heartbeatSetInterval ? this._restartHeartbeat() : this._startHeartbeat();
this._addConfigurationKey('HeartBeatInterval', payload.interval);
this._addConfigurationKey('HeartbeatInterval', payload.interval, false, false);
- this._startMessageSequence();
this._hasStopped && (this._hasStopped = false);
} else if (payload.status === 'Pending') {
logger.info(this._logPrefix() + ' Charging station in pending state on the central server');
return Constants.OCPP_CONFIGURATION_RESPONSE_REJECTED;
} else if (keyToChange && !keyToChange.readonly) {
const keyIndex = this._configuration.configurationKey.indexOf(keyToChange);
- this._configuration.configurationKey[keyIndex].value = commandPayload.value;
+ let valueChanged = false;
+ if (this._configuration.configurationKey[keyIndex].value !== commandPayload.value) {
+ this._configuration.configurationKey[keyIndex].value = commandPayload.value as string;
+ valueChanged = true;
+ }
let triggerHeartbeatRestart = false;
- if (keyToChange.key === 'HeartBeatInterval') {
+ if (keyToChange.key === 'HeartBeatInterval' && valueChanged) {
this._setConfigurationKeyValue('HeartbeatInterval', commandPayload.value);
triggerHeartbeatRestart = true;
}
- if (keyToChange.key === 'HeartbeatInterval') {
+ if (keyToChange.key === 'HeartbeatInterval' && valueChanged) {
this._setConfigurationKeyValue('HeartBeatInterval', commandPayload.value);
triggerHeartbeatRestart = true;
}
if (triggerHeartbeatRestart) {
this._heartbeatInterval = Utils.convertToInt(commandPayload.value) * 1000;
- // Stop heartbeat
- this._stopHeartbeat();
- // Start heartbeat
- this._startHeartbeat();
+ this._restartHeartbeat();
+ }
+ if (keyToChange.key === 'WebSocketPingInterval' && valueChanged) {
+ this._restartWebSocketPing();
}
if (keyToChange.reboot) {
return Constants.OCPP_CONFIGURATION_RESPONSE_REBOOT_REQUIRED;