import { URL } from 'url';
import { parentPort } from 'worker_threads';
-import WebSocket, { Data, RawData } from 'ws';
+import WebSocket, { Data, MessageEvent, RawData } from 'ws';
import BaseError from '../exception/BaseError';
import OCPPError from '../exception/OCPPError';
PowerUnits,
WsOptions,
} from '../types/ChargingStationTemplate';
-import { ChargingStationWorkerMessageEvents } from '../types/ChargingStationWorker';
import { SupervisionUrlDistribution } from '../types/ConfigurationData';
import { ConnectorStatus } from '../types/ConnectorStatus';
import { FileType } from '../types/FileType';
StatusNotificationResponse,
} from '../types/ocpp/Responses';
import {
+ StartTransactionRequest,
+ StartTransactionResponse,
StopTransactionReason,
StopTransactionRequest,
StopTransactionResponse,
} from '../types/ocpp/Transaction';
+import { ProcedureName } from '../types/UIProtocol';
import { WSError, WebSocketCloseEventStatusCode } from '../types/WebSocket';
+import { WorkerBroadcastChannelData } from '../types/WorkerBroadcastChannel';
import Configuration from '../utils/Configuration';
import Constants from '../utils/Constants';
import { ACElectricUtils, DCElectricUtils } from '../utils/ElectricUtils';
import AutomaticTransactionGenerator from './AutomaticTransactionGenerator';
import { ChargingStationConfigurationUtils } from './ChargingStationConfigurationUtils';
import { ChargingStationUtils } from './ChargingStationUtils';
+import { MessageChannelUtils } from './MessageChannelUtils';
import OCPP16IncomingRequestService from './ocpp/1.6/OCPP16IncomingRequestService';
import OCPP16RequestService from './ocpp/1.6/OCPP16RequestService';
import OCPP16ResponseService from './ocpp/1.6/OCPP16ResponseService';
import OCPPIncomingRequestService from './ocpp/OCPPIncomingRequestService';
import OCPPRequestService from './ocpp/OCPPRequestService';
import SharedLRUCache from './SharedLRUCache';
+import WorkerBroadcastChannel from './WorkerBroadcastChannel';
export default class ChargingStation {
public hashId!: string;
private readonly sharedLRUCache: SharedLRUCache;
private automaticTransactionGenerator!: AutomaticTransactionGenerator;
private webSocketPingSetInterval!: NodeJS.Timeout;
+ private workerBroadcastChannel: WorkerBroadcastChannel;
constructor(index: number, templateFile: string) {
this.index = index;
this.connectors = new Map<number, ConnectorStatus>();
this.requests = new Map<string, CachedRequest>();
this.messageBuffer = new Set<string>();
+ this.workerBroadcastChannel = new WorkerBroadcastChannel();
+
+ this.workerBroadcastChannel.onmessage = this.handleWorkerBroadcastChannelMessage.bind(this) as (
+ message: MessageEvent
+ ) => void;
+
this.initialize();
}
// FIXME?: restart heartbeat and WebSocket ping when their interval values have changed
} catch (error) {
logger.error(
- `${this.logPrefix()} ${FileType.ChargingStationTemplate} file monitoring error: %j`,
+ `${this.logPrefix()} ${FileType.ChargingStationTemplate} file monitoring error:`,
error
);
}
}
}
);
- parentPort.postMessage({
- id: ChargingStationWorkerMessageEvents.STARTED,
- data: { id: this.stationInfo.chargingStationId },
- });
+ parentPort.postMessage(MessageChannelUtils.buildStartedMessage(this));
}
public async stop(reason: StopTransactionReason = StopTransactionReason.NONE): Promise<void> {
this.templateFileWatcher.close();
this.sharedLRUCache.deleteChargingStationTemplate(this.stationInfo?.templateHash);
this.bootNotificationResponse = null;
- parentPort.postMessage({
- id: ChargingStationWorkerMessageEvents.STOPPED,
- data: { id: this.stationInfo.chargingStationId },
- });
+ parentPort.postMessage(MessageChannelUtils.buildStoppedMessage(this));
this.stopped = true;
}
logger.error(errMsg);
throw new OCPPError(ErrorType.PROTOCOL_ERROR, errMsg);
}
+ parentPort.postMessage(MessageChannelUtils.buildUpdatedMessage(this));
} else {
throw new OCPPError(ErrorType.PROTOCOL_ERROR, 'Incoming message is not iterable', null, {
payload: request,
} catch (error) {
// Log
logger.error(
- "%s Incoming OCPP '%s' message '%j' matching cached request '%j' processing error: %j",
+ "%s Incoming OCPP '%s' message '%j' matching cached request '%j' processing error:",
this.logPrefix(),
commandName ?? requestCommandName ?? null,
data.toString(),
);
if (!(error instanceof OCPPError)) {
logger.warn(
- "%s Error thrown at incoming OCPP '%s' message '%j' handling is not an OCPPError: %j",
+ "%s Error thrown at incoming OCPP '%s' message '%j' handling is not an OCPPError:",
this.logPrefix(),
commandName ?? requestCommandName ?? null,
data.toString(),
private onError(error: WSError): void {
this.closeWSConnection();
- logger.error(this.logPrefix() + ' WebSocket error: %j', error);
+ logger.error(this.logPrefix() + ' WebSocket error:', error);
}
private getUseConnectorId0(stationInfo?: ChargingStationInfo): boolean | undefined {
this.getConnectorStatus(connectorId).energyActiveImportRegisterValue = 0;
this.getConnectorStatus(connectorId).transactionEnergyActiveImportRegisterValue = 0;
}
+
+ private async handleWorkerBroadcastChannelMessage(message: MessageEvent): Promise<void> {
+ const [command, payload] = message.data as unknown as [
+ ProcedureName,
+ WorkerBroadcastChannelData
+ ];
+
+ if (payload.hashId !== this.hashId) {
+ return;
+ }
+
+ switch (command) {
+ case ProcedureName.START_TRANSACTION:
+ await this.ocppRequestService.requestHandler<
+ StartTransactionRequest,
+ StartTransactionResponse
+ >(this, RequestCommand.START_TRANSACTION, {
+ connectorId: payload.connectorId,
+ idTag: payload.idTag,
+ });
+ break;
+ case ProcedureName.STOP_TRANSACTION:
+ await this.ocppRequestService.requestHandler<
+ StopTransactionRequest,
+ StopTransactionResponse
+ >(this, RequestCommand.STOP_TRANSACTION, {
+ transactionId: payload.transactionId,
+ meterStop: this.getEnergyActiveImportRegisterByTransactionId(payload.transactionId),
+ idTag: this.getTransactionIdTag(payload.transactionId),
+ reason: StopTransactionReason.NONE,
+ });
+ break;
+ }
+ }
}