X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fcharging-station%2FChargingStation.ts;h=f74cf4bd89906bc6218d648d967e3e12913ef8b8;hb=f0227f390d3f6e303bc54a96c16c075e9de46ad6;hp=dd7cac5a4e51446d543affaa513db6fe906b140b;hpb=32de5a575189d226213641f5ee36004f8454cb50;p=e-mobility-charging-stations-simulator.git diff --git a/src/charging-station/ChargingStation.ts b/src/charging-station/ChargingStation.ts index dd7cac5a..f74cf4bd 100644 --- a/src/charging-station/ChargingStation.ts +++ b/src/charging-station/ChargingStation.ts @@ -6,7 +6,7 @@ import path from 'path'; import { URL } from 'url'; import { parentPort } from 'worker_threads'; -import WebSocket, { Data, MessageEvent, RawData } from 'ws'; +import WebSocket, { Data, RawData } from 'ws'; import BaseError from '../exception/BaseError'; import OCPPError from '../exception/OCPPError'; @@ -58,15 +58,11 @@ import { StatusNotificationResponse, } from '../types/ocpp/Responses'; import { - StartTransactionRequest, - StartTransactionResponse, StopTransactionReason, StopTransactionRequest, StopTransactionResponse, } from '../types/ocpp/Transaction'; -import { ProcedureName } from '../types/UIProtocol'; import { WSError, WebSocketCloseEventStatusCode } from '../types/WebSocket'; -import { WorkerBroadcastChannelData } from '../types/WorkerBroadcastChannel'; import Configuration from '../utils/Configuration'; import Constants from '../utils/Constants'; import { ACElectricUtils, DCElectricUtils } from '../utils/ElectricUtils'; @@ -77,6 +73,7 @@ import AuthorizedTagsCache from './AuthorizedTagsCache'; import AutomaticTransactionGenerator from './AutomaticTransactionGenerator'; import { ChargingStationConfigurationUtils } from './ChargingStationConfigurationUtils'; import { ChargingStationUtils } from './ChargingStationUtils'; +import ChargingStationWorkerBroadcastChannel from './ChargingStationWorkerBroadcastChannel'; import { MessageChannelUtils } from './MessageChannelUtils'; import OCPP16IncomingRequestService from './ocpp/1.6/OCPP16IncomingRequestService'; import OCPP16RequestService from './ocpp/1.6/OCPP16RequestService'; @@ -85,13 +82,13 @@ import { OCPP16ServiceUtils } from './ocpp/1.6/OCPP16ServiceUtils'; import OCPPIncomingRequestService from './ocpp/OCPPIncomingRequestService'; import OCPPRequestService from './ocpp/OCPPRequestService'; import SharedLRUCache from './SharedLRUCache'; -import WorkerBroadcastChannel from './WorkerBroadcastChannel'; export default class ChargingStation { public hashId!: string; public readonly templateFile: string; public authorizedTagsCache: AuthorizedTagsCache; public stationInfo!: ChargingStationInfo; + public stopped: boolean; public readonly connectors: Map; public ocppConfiguration!: ChargingStationOcppConfiguration; public wsConnection!: WebSocket; @@ -111,12 +108,11 @@ export default class ChargingStation { private configuredSupervisionUrl!: URL; private wsConnectionRestarted: boolean; private autoReconnectRetryCount: number; - private stopped: boolean; private templateFileWatcher!: fs.FSWatcher; private readonly sharedLRUCache: SharedLRUCache; private automaticTransactionGenerator!: AutomaticTransactionGenerator; private webSocketPingSetInterval!: NodeJS.Timeout; - private workerBroadcastChannel: WorkerBroadcastChannel; + private readonly chargingStationWorkerBroadcastChannel: ChargingStationWorkerBroadcastChannel; constructor(index: number, templateFile: string) { this.index = index; @@ -129,11 +125,7 @@ export default class ChargingStation { this.connectors = new Map(); this.requests = new Map(); this.messageBuffer = new Set(); - this.workerBroadcastChannel = new WorkerBroadcastChannel(); - - this.workerBroadcastChannel.onmessage = this.handleWorkerBroadcastChannelMessage.bind(this) as ( - message: MessageEvent - ) => void; + this.chargingStationWorkerBroadcastChannel = new ChargingStationWorkerBroadcastChannel(this); this.initialize(); } @@ -344,22 +336,22 @@ export default class ChargingStation { } } - public getEnergyActiveImportRegisterByTransactionId(transactionId: number): number | undefined { + public getEnergyActiveImportRegisterByTransactionId(transactionId: number): number { const transactionConnectorStatus = this.getConnectorStatus( this.getConnectorIdByTransactionId(transactionId) ); if (this.getMeteringPerTransaction()) { - return transactionConnectorStatus?.transactionEnergyActiveImportRegisterValue; + return transactionConnectorStatus?.transactionEnergyActiveImportRegisterValue ?? 0; } - return transactionConnectorStatus?.energyActiveImportRegisterValue; + return transactionConnectorStatus?.energyActiveImportRegisterValue ?? 0; } - public getEnergyActiveImportRegisterByConnectorId(connectorId: number): number | undefined { + public getEnergyActiveImportRegisterByConnectorId(connectorId: number): number { const connectorStatus = this.getConnectorStatus(connectorId); if (this.getMeteringPerTransaction()) { - return connectorStatus?.transactionEnergyActiveImportRegisterValue; + return connectorStatus?.transactionEnergyActiveImportRegisterValue ?? 0; } - return connectorStatus?.energyActiveImportRegisterValue; + return connectorStatus?.energyActiveImportRegisterValue ?? 0; } public getAuthorizeRemoteTxRequests(): boolean { @@ -558,8 +550,8 @@ export default class ChargingStation { this.templateFileWatcher.close(); this.sharedLRUCache.deleteChargingStationTemplate(this.stationInfo?.templateHash); this.bootNotificationResponse = null; - parentPort.postMessage(MessageChannelUtils.buildStoppedMessage(this)); this.stopped = true; + parentPort.postMessage(MessageChannelUtils.buildStoppedMessage(this)); } public async reset(reason?: StopTransactionReason): Promise { @@ -634,7 +626,7 @@ export default class ChargingStation { ); this.getConnectorStatus(connectorId).chargingProfiles = []; } - if (!Array.isArray(this.getConnectorStatus(connectorId).chargingProfiles)) { + if (Array.isArray(this.getConnectorStatus(connectorId).chargingProfiles) === false) { logger.error( `${this.logPrefix()} Trying to set a charging profile on connectorId ${connectorId} with an improper attribute type for the charging profiles array, applying proper type initialization` ); @@ -683,6 +675,74 @@ export default class ChargingStation { this.messageBuffer.add(message); } + public openWSConnection( + options: WsOptions = this.stationInfo?.wsOptions ?? {}, + params: { closeOpened?: boolean; terminateOpened?: boolean } = { + closeOpened: false, + terminateOpened: false, + } + ): void { + options.handshakeTimeout = options?.handshakeTimeout ?? this.getConnectionTimeout() * 1000; + params.closeOpened = params?.closeOpened ?? false; + params.terminateOpened = params?.terminateOpened ?? false; + if ( + !Utils.isNullOrUndefined(this.stationInfo.supervisionUser) && + !Utils.isNullOrUndefined(this.stationInfo.supervisionPassword) + ) { + options.auth = `${this.stationInfo.supervisionUser}:${this.stationInfo.supervisionPassword}`; + } + if (params?.closeOpened) { + this.closeWSConnection(); + } + if (params?.terminateOpened) { + this.terminateWSConnection(); + } + let protocol: string; + switch (this.getOcppVersion()) { + case OCPPVersion.VERSION_16: + protocol = 'ocpp' + OCPPVersion.VERSION_16; + break; + default: + this.handleUnsupportedVersion(this.getOcppVersion()); + break; + } + + logger.info( + this.logPrefix() + ' Open OCPP connection to URL ' + this.wsConnectionUrl.toString() + ); + + this.wsConnection = new WebSocket(this.wsConnectionUrl, protocol, options); + + // Handle WebSocket message + this.wsConnection.on( + 'message', + this.onMessage.bind(this) as (this: WebSocket, data: RawData, isBinary: boolean) => void + ); + // Handle WebSocket error + this.wsConnection.on( + 'error', + this.onError.bind(this) as (this: WebSocket, error: Error) => void + ); + // Handle WebSocket close + this.wsConnection.on( + 'close', + this.onClose.bind(this) as (this: WebSocket, code: number, reason: Buffer) => void + ); + // Handle WebSocket open + this.wsConnection.on('open', this.onOpen.bind(this) as (this: WebSocket) => void); + // Handle WebSocket ping + this.wsConnection.on('ping', this.onPing.bind(this) as (this: WebSocket, data: Buffer) => void); + // Handle WebSocket pong + this.wsConnection.on('pong', this.onPong.bind(this) as (this: WebSocket, data: Buffer) => void); + } + + public closeWSConnection(): void { + if (this.isWebSocketConnectionOpened()) { + this.wsConnection.close(); + this.wsConnection = null; + } + } + private flushMessageBuffer() { if (this.messageBuffer.size > 0) { this.messageBuffer.forEach((message) => { @@ -864,7 +924,7 @@ export default class ChargingStation { this.templateFile }`; logger.error(errMsg); - throw new Error(errMsg); + throw new BaseError(errMsg); } private initialize(): void { @@ -1359,7 +1419,7 @@ export default class ChargingStation { let errMsg: string; try { const request = JSON.parse(data.toString()) as IncomingRequest | Response | ErrorResponse; - if (Utils.isIterable(request)) { + if (Array.isArray(request) === true) { [messageType, messageId] = request; // Check the type of message switch (messageType) { @@ -1396,12 +1456,12 @@ export default class ChargingStation { } // Respond cachedRequest = this.requests.get(messageId); - if (Utils.isIterable(cachedRequest)) { + if (Array.isArray(cachedRequest) === true) { [responseCallback, , requestCommandName, requestPayload] = cachedRequest; } else { throw new OCPPError( ErrorType.PROTOCOL_ERROR, - `Cached request for message id ${messageId} response is not iterable`, + `Cached request for message id ${messageId} response is not an array`, null, cachedRequest as unknown as JsonType ); @@ -1426,12 +1486,12 @@ export default class ChargingStation { ); } cachedRequest = this.requests.get(messageId); - if (Utils.isIterable(cachedRequest)) { + if (Array.isArray(cachedRequest) === true) { [, errorCallback, requestCommandName] = cachedRequest; } else { throw new OCPPError( ErrorType.PROTOCOL_ERROR, - `Cached request for message id ${messageId} error response is not iterable`, + `Cached request for message id ${messageId} error response is not an array`, null, cachedRequest as unknown as JsonType ); @@ -1452,7 +1512,7 @@ export default class ChargingStation { } parentPort.postMessage(MessageChannelUtils.buildUpdatedMessage(this)); } else { - throw new OCPPError(ErrorType.PROTOCOL_ERROR, 'Incoming message is not iterable', null, { + throw new OCPPError(ErrorType.PROTOCOL_ERROR, 'Incoming message is not an array', null, { payload: request, }); } @@ -1876,74 +1936,6 @@ export default class ChargingStation { } } - private openWSConnection( - options: WsOptions = this.stationInfo?.wsOptions ?? {}, - params: { closeOpened?: boolean; terminateOpened?: boolean } = { - closeOpened: false, - terminateOpened: false, - } - ): void { - options.handshakeTimeout = options?.handshakeTimeout ?? this.getConnectionTimeout() * 1000; - params.closeOpened = params?.closeOpened ?? false; - params.terminateOpened = params?.terminateOpened ?? false; - if ( - !Utils.isNullOrUndefined(this.stationInfo.supervisionUser) && - !Utils.isNullOrUndefined(this.stationInfo.supervisionPassword) - ) { - options.auth = `${this.stationInfo.supervisionUser}:${this.stationInfo.supervisionPassword}`; - } - if (params?.closeOpened) { - this.closeWSConnection(); - } - if (params?.terminateOpened) { - this.terminateWSConnection(); - } - let protocol: string; - switch (this.getOcppVersion()) { - case OCPPVersion.VERSION_16: - protocol = 'ocpp' + OCPPVersion.VERSION_16; - break; - default: - this.handleUnsupportedVersion(this.getOcppVersion()); - break; - } - - logger.info( - this.logPrefix() + ' Open OCPP connection to URL ' + this.wsConnectionUrl.toString() - ); - - this.wsConnection = new WebSocket(this.wsConnectionUrl, protocol, options); - - // Handle WebSocket message - this.wsConnection.on( - 'message', - this.onMessage.bind(this) as (this: WebSocket, data: RawData, isBinary: boolean) => void - ); - // Handle WebSocket error - this.wsConnection.on( - 'error', - this.onError.bind(this) as (this: WebSocket, error: Error) => void - ); - // Handle WebSocket close - this.wsConnection.on( - 'close', - this.onClose.bind(this) as (this: WebSocket, code: number, reason: Buffer) => void - ); - // Handle WebSocket open - this.wsConnection.on('open', this.onOpen.bind(this) as (this: WebSocket) => void); - // Handle WebSocket ping - this.wsConnection.on('ping', this.onPing.bind(this) as (this: WebSocket, data: Buffer) => void); - // Handle WebSocket pong - this.wsConnection.on('pong', this.onPong.bind(this) as (this: WebSocket, data: Buffer) => void); - } - - private closeWSConnection(): void { - if (this.isWebSocketConnectionOpened()) { - this.wsConnection.close(); - this.wsConnection = null; - } - } - private terminateWSConnection(): void { if (this.isWebSocketConnectionOpened()) { this.wsConnection.terminate(); @@ -2023,38 +2015,4 @@ export default class ChargingStation { this.getConnectorStatus(connectorId).energyActiveImportRegisterValue = 0; this.getConnectorStatus(connectorId).transactionEnergyActiveImportRegisterValue = 0; } - - private async handleWorkerBroadcastChannelMessage(message: MessageEvent): Promise { - const [command, payload] = message.data as unknown as [ - ProcedureName, - WorkerBroadcastChannelData - ]; - - if (payload.hashId !== this.hashId) { - return; - } - - switch (command) { - case ProcedureName.START_TRANSACTION: - await this.ocppRequestService.requestHandler< - StartTransactionRequest, - StartTransactionResponse - >(this, RequestCommand.START_TRANSACTION, { - connectorId: payload.connectorId, - idTag: payload.idTag, - }); - break; - case ProcedureName.STOP_TRANSACTION: - await this.ocppRequestService.requestHandler< - StopTransactionRequest, - StopTransactionResponse - >(this, RequestCommand.STOP_TRANSACTION, { - transactionId: payload.transactionId, - meterStop: this.getEnergyActiveImportRegisterByTransactionId(payload.transactionId), - idTag: this.getTransactionIdTag(payload.transactionId), - reason: StopTransactionReason.NONE, - }); - break; - } - } }