From 16b0d4e77d20b5f7004469f2206a0af1ee8984a5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 11 Sep 2021 21:45:59 +0200 Subject: [PATCH] Use a Map to cache OCPP requests in use MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- package-lock.json | 6 +-- package.json | 2 +- src/charging-station/ChargingStation.ts | 47 ++++++++++--------- .../ocpp/OCPPRequestService.ts | 4 +- src/types/WebSocket.ts | 4 ++ src/types/ocpp/Requests.ts | 4 -- 6 files changed, 35 insertions(+), 32 deletions(-) diff --git a/package-lock.json b/package-lock.json index 353f084b..a7e37489 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19780,9 +19780,9 @@ } }, "typescript": { - "version": "4.4.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.2.tgz", - "integrity": "sha512-gzP+t5W4hdy4c+68bfcv0t400HVJMMd2+H9B7gae1nQlBzCqvrXX+6GL/b3GAgyTH966pzrZ70/fRjwAtZksSQ==", + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.3.tgz", + "integrity": "sha512-4xfscpisVgqqDfPaJo5vkd+Qd/ItkoagnHpufr+i2QCHBsNYp+G7UAoyFl8aPtx879u38wPV65rZ8qbGZijalA==", "dev": true }, "ua-parser-js": { diff --git a/package.json b/package.json index f44caf17..4f5aaa42 100644 --- a/package.json +++ b/package.json @@ -121,6 +121,6 @@ "rollup-plugin-terser": "^7.0.2", "rollup-plugin-ts": "^1.4.1", "ts-node": "^10.2.1", - "typescript": "^4.4.2" + "typescript": "^4.4.3" } } diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index 6a8df931..2f1bbc3e 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -1,13 +1,14 @@ // Partial Copyright Jerome Benoit. 2021. All Rights Reserved. +import { AvailabilityType, BootNotificationRequest, IncomingRequest, IncomingRequestCommand, Request } from '../types/ocpp/Requests'; import { BootNotificationResponse, RegistrationStatus } from '../types/ocpp/Responses'; import ChargingStationConfiguration, { ConfigurationKey } from '../types/ChargingStationConfiguration'; import ChargingStationTemplate, { CurrentType, PowerUnits, Voltage } from '../types/ChargingStationTemplate'; import { ConnectorPhaseRotation, StandardParametersKey, SupportedFeatureProfiles } from '../types/ocpp/Configuration'; import Connectors, { Connector, SampledValueTemplate } from '../types/Connectors'; import { MeterValueMeasurand, MeterValuePhase } from '../types/ocpp/MeterValues'; -import Requests, { AvailabilityType, BootNotificationRequest, IncomingRequest, IncomingRequestCommand } from '../types/ocpp/Requests'; -import WebSocket, { ClientOptions, MessageEvent } from 'ws'; +import { WSError, WebSocketCloseEventStatusCode } from '../types/WebSocket'; +import WebSocket, { ClientOptions, Data } from 'ws'; import AutomaticTransactionGenerator from './AutomaticTransactionGenerator'; import { ChargePointStatus } from '../types/ocpp/ChargePointStatus'; @@ -30,7 +31,6 @@ import PerformanceStatistics from '../performance/PerformanceStatistics'; import { StopTransactionReason } from '../types/ocpp/Transaction'; import { URL } from 'url'; import Utils from '../utils/Utils'; -import { WebSocketCloseEventStatusCode } from '../types/WebSocket'; import crypto from 'crypto'; import fs from 'fs'; import logger from '../utils/Logger'; @@ -44,7 +44,7 @@ export default class ChargingStation { public configuration!: ChargingStationConfiguration; public hasStopped: boolean; public wsConnection!: WebSocket; - public requests: Requests; + public requests: Map; public messageQueue: string[]; public performanceStatistics!: PerformanceStatistics; public heartbeatSetInterval!: NodeJS.Timeout; @@ -70,8 +70,8 @@ export default class ChargingStation { this.hasSocketRestarted = false; this.autoReconnectRetryCount = 0; - this.requests = {} as Requests; - this.messageQueue = [] as string[]; + this.requests = new Map(); + this.messageQueue = new Array(); this.authorizedTags = this.getAuthorizedTags(); } @@ -236,7 +236,6 @@ export default class ChargingStation { for (let index = 0; !Utils.isEmptyArray(sampledValueTemplates) && index < sampledValueTemplates.length; index++) { if (!Constants.SUPPORTED_MEASURANDS.includes(sampledValueTemplates[index]?.measurand ?? MeterValueMeasurand.ENERGY_ACTIVE_IMPORT_REGISTER)) { logger.warn(`${this.logPrefix()} Unsupported MeterValues measurand ${measurand} ${phase ? `on phase ${phase} ` : ''}in template on connectorId ${connectorId}`); - continue; } else if (phase && sampledValueTemplates[index]?.phase === phase && sampledValueTemplates[index]?.measurand === measurand && this.getConfigurationKey(StandardParametersKey.MeterValuesSampledData).value.includes(measurand)) { return sampledValueTemplates[index]; @@ -646,14 +645,15 @@ export default class ChargingStation { } } - private async onMessage(messageEvent: MessageEvent): Promise { + private async onMessage(data: Data): Promise { let [messageType, messageId, commandName, commandPayload, errorDetails]: IncomingRequest = [0, '', '' as IncomingRequestCommand, {}, {}]; let responseCallback: (payload: Record | string, requestPayload: Record) => void; let rejectCallback: (error: OCPPError) => void; let requestPayload: Record; + let cachedRequest: Request; let errMsg: string; try { - const request = JSON.parse(messageEvent.toString()) as IncomingRequest; + const request = JSON.parse(data.toString()) as IncomingRequest; if (Utils.isIterable(request)) { // Parse the message [messageType, messageId, commandName, commandPayload, errorDetails] = request; @@ -673,8 +673,9 @@ export default class ChargingStation { // Outcome Message case MessageType.CALL_RESULT_MESSAGE: // Respond - if (Utils.isIterable(this.requests[messageId])) { - [responseCallback, , requestPayload] = this.requests[messageId]; + cachedRequest = this.requests.get(messageId); + if (Utils.isIterable(cachedRequest)) { + [responseCallback, , requestPayload] = cachedRequest; } else { throw new OCPPError(ErrorType.PROTOCOL_ERROR, `Response request for message id ${messageId} is not iterable`, commandName); } @@ -682,21 +683,22 @@ export default class ChargingStation { // Error throw new OCPPError(ErrorType.INTERNAL_ERROR, `Response request for unknown message id ${messageId}`, commandName); } - delete this.requests[messageId]; + this.requests.delete(messageId); responseCallback(commandName, requestPayload); break; // Error Message case MessageType.CALL_ERROR_MESSAGE: - if (!this.requests[messageId]) { + cachedRequest = this.requests.get(messageId); + if (!cachedRequest) { // Error throw new OCPPError(ErrorType.INTERNAL_ERROR, `Error request for unknown message id ${messageId}`); } - if (Utils.isIterable(this.requests[messageId])) { - [, rejectCallback] = this.requests[messageId]; + if (Utils.isIterable(cachedRequest)) { + [, rejectCallback] = cachedRequest; } else { throw new OCPPError(ErrorType.PROTOCOL_ERROR, `Error request for message id ${messageId} is not iterable`); } - delete this.requests[messageId]; + this.requests.delete(messageId); rejectCallback(new OCPPError(commandName, commandPayload.toString(), null, errorDetails)); break; // Error @@ -707,7 +709,7 @@ export default class ChargingStation { } } catch (error) { // Log - logger.error('%s Incoming request message %j processing error %j on content type %j', this.logPrefix(), messageEvent, error, this.requests[messageId]); + logger.error('%s Incoming request message %j processing error %j on content type %j', this.logPrefix(), data, error, this.requests.get(messageId)); // Send error messageType !== MessageType.CALL_ERROR_MESSAGE && await this.ocppRequestService.sendError(messageId, error, commandName); } @@ -721,11 +723,11 @@ export default class ChargingStation { logger.debug(this.logPrefix() + ' Received a WS pong (rfc6455) from the server'); } - private async onError(errorEvent: any): Promise { - logger.error(this.logPrefix() + ' Socket error: %j', errorEvent); - // switch (errorEvent.code) { + private async onError(error: WSError): Promise { + logger.error(this.logPrefix() + ' Socket error: %j', error); + // switch (error.code) { // case 'ECONNREFUSED': - // await this._reconnect(errorEvent); + // await this.reconnect(error); // break; // } } @@ -985,6 +987,7 @@ export default class ChargingStation { logger.debug(this.logPrefix() + ' Authorization file ' + authorizationFile + ' have changed, reload'); // Initialize authorizedTags this.authorizedTags = this.getAuthorizedTags(); + console.log('here'); } catch (error) { logger.error(this.logPrefix() + ' Authorization file monitoring error: %j', error); } @@ -1030,7 +1033,7 @@ export default class ChargingStation { return !Utils.isUndefined(this.stationInfo.reconnectExponentialDelay) ? this.stationInfo.reconnectExponentialDelay : false; } - private async reconnect(error: unknown): Promise { + private async reconnect(error: Error): Promise { // Stop WebSocket ping this.stopWebSocketPing(); // Stop heartbeat diff --git a/src/charging-station/ocpp/OCPPRequestService.ts b/src/charging-station/ocpp/OCPPRequestService.ts index 7b20343a..f8956706 100644 --- a/src/charging-station/ocpp/OCPPRequestService.ts +++ b/src/charging-station/ocpp/OCPPRequestService.ts @@ -35,7 +35,7 @@ export default abstract class OCPPRequestService { // Request case MessageType.CALL_MESSAGE: // Build request - this.chargingStation.requests[messageId] = [responseCallback, rejectCallback, commandParams as Record]; + this.chargingStation.requests.set(messageId, [responseCallback, rejectCallback, commandParams as Record]); messageToSend = JSON.stringify([messageType, messageId, commandName, commandParams]); break; // Response @@ -100,7 +100,7 @@ export default abstract class OCPPRequestService { logger.debug(`${self.chargingStation.logPrefix()} Error: %j occurred when calling command %s with parameters: %j`, error, commandName, commandParams); // Build Exception // eslint-disable-next-line no-empty-function - self.chargingStation.requests[messageId] = [() => { }, () => { }, {}]; + self.chargingStation.requests.set(messageId, [() => { }, () => { }, {}]); // Send error reject(error); } diff --git a/src/types/WebSocket.ts b/src/types/WebSocket.ts index 8fedf7b4..60b2f681 100644 --- a/src/types/WebSocket.ts +++ b/src/types/WebSocket.ts @@ -35,3 +35,7 @@ export enum WebSocketCloseEventStatusCode { CLOSE_BAD_GATEWAY = 1014, CLOSE_TLS_HANDSHAKE = 1015 } + +export interface WSError extends Error { + code?: string +} diff --git a/src/types/ocpp/Requests.ts b/src/types/ocpp/Requests.ts index 89f83665..713fa0e6 100644 --- a/src/types/ocpp/Requests.ts +++ b/src/types/ocpp/Requests.ts @@ -4,10 +4,6 @@ import { MessageType } from './MessageType'; import { OCPP16DiagnosticsStatus } from './1.6/DiagnosticsStatus'; import OCPPError from '../../charging-station/ocpp/OCPPError'; -export default interface Requests { - [id: string]: Request; -} - export type BootNotificationRequest = OCPP16BootNotificationRequest; export type AvailabilityType = OCPP16AvailabilityType; -- 2.34.1