workerChoiceStrategy: Configuration.getWorkerPoolStrategy()
},
messageHandler: async (msg: ChargingStationWorkerMessage) => {
- if (msg.id === ChargingStationWorkerMessageEvents.PERFORMANCE_STATISTICS) {
+ if (msg.id === ChargingStationWorkerMessageEvents.STARTED) {
+ this.webSocketServer.webSocketServerService.chargingStations.add(msg.data.id);
+ } else if (msg.id === ChargingStationWorkerMessageEvents.STOPPED) {
+ this.webSocketServer.webSocketServerService.chargingStations.delete(msg.data.id);
+ } else if (msg.id === ChargingStationWorkerMessageEvents.PERFORMANCE_STATISTICS) {
await this.storage.storePerformanceStatistics(msg.data);
}
}
import { ChargePointStatus } from '../types/ocpp/ChargePointStatus';
import { ChargingProfile } from '../types/ocpp/ChargingProfile';
import ChargingStationInfo from '../types/ChargingStationInfo';
+import { ChargingStationWorkerMessageEvents } from '../types/ChargingStationWorker';
import { ClientRequestArgs } from 'http';
import Configuration from '../utils/Configuration';
import Constants from '../utils/Constants';
import crypto from 'crypto';
import fs from 'fs';
import logger from '../utils/Logger';
+import { parentPort } from 'worker_threads';
import path from 'path';
export default class ChargingStation {
this.wsConnection.on('ping', this.onPing.bind(this));
// Handle WebSocket pong
this.wsConnection.on('pong', this.onPong.bind(this));
+ parentPort.postMessage({ id: ChargingStationWorkerMessageEvents.STARTED, data: { id: this.stationInfo.chargingStationId } });
}
public async stop(reason: StopTransactionReason = StopTransactionReason.NONE): Promise<void> {
this.performanceStatistics.stop();
}
this.bootNotificationResponse = null;
+ parentPort.postMessage({ id: ChargingStationWorkerMessageEvents.STOPPED, data: { id: this.stationInfo.chargingStationId } });
this.stopped = true;
}
import logger from '../utils/Logger';
export default class WebSocketServer extends WebSocket.Server {
- private webSocketServerService: AbstractUIService;
+ public webSocketServerService: AbstractUIService;
public constructor(options?: WebSocket.ServerOptions, callback?: () => void) {
// Create the WebSocket Server
- super(options, callback);
+ super({ ...options, port: 80 }, callback);
// FIXME: version the instantiation
this.webSocketServerService = new UIService(this);
}
- public broadcastToClients(message: Record<string, unknown>): void {
+ public broadcastToClients(message: string | Record<string, unknown>): void {
for (const client of this.clients) {
if (client?.readyState === WebSocket.OPEN) {
client.send(message);
[version, command, payload] = JSON.parse(messageData.toString()) as ProtocolRequest;
switch (version) {
case ProtocolVersion['0.0.1']:
- this.webSocketServerService.handleMessage(command, payload).catch(() => {
+ this.webSocketServerService.handleMessage(version, command, payload).catch(() => {
logger.error(`${this.logPrefix()} Error while handling command %s message: %j`, command, payload);
});
break;
-import { ProtocolCommand, ProtocolRequestHandler } from '../../../../types/UIProtocol';
+import { ProtocolCommand, ProtocolRequestHandler, ProtocolVersion } from '../../../../types/UIProtocol';
import AbstractUIService from '../AbstractUIService';
import BaseError from '../../../../exception/BaseError';
constructor(webSocketServer: WebSocketServer) {
super(webSocketServer);
this.messageHandlers = new Map<ProtocolCommand, ProtocolRequestHandler>([
+ [ProtocolCommand.LIST_CHARGING_STATIONS, this.handleListChargingStations.bind(this)],
[ProtocolCommand.START_TRANSACTION, this.handleStartTransaction.bind(this)],
[ProtocolCommand.STOP_TRANSACTION, this.handleStopTransaction.bind(this)],
]);
}
- async handleMessage(command: ProtocolCommand, payload: Record<string, unknown>): Promise<void> {
+ async handleMessage(version: ProtocolVersion, command: ProtocolCommand, payload: Record<string, unknown>): Promise<void> {
let messageResponse: Record<string, unknown>;
if (this.messageHandlers.has(command) && command !== ProtocolCommand.UNKNOWN) {
try {
throw new BaseError(`${command} is not implemented to handle message payload ${JSON.stringify(payload, null, 2)}`);
}
// Send the built response
- this.webSocketServer.broadcastToClients(messageResponse);
+ this.webSocketServer.broadcastToClients(this.buildProtocolMessage(version, command, messageResponse));
+ }
+
+ private handleListChargingStations(payload: Record<string, unknown>) {
+ return this.chargingStations;
}
private handleStartTransaction(payload: Record<string, unknown>) { }
-import { ProtocolCommand } from '../../../types/UIProtocol';
+import { ProtocolCommand, ProtocolVersion } from '../../../types/UIProtocol';
+
import WebSocketServer from '../../WebSocketServer';
export default abstract class AbstractUIService {
+ public readonly chargingStations: Set<string>;
protected readonly webSocketServer: WebSocketServer;
constructor(webSocketServer: WebSocketServer) {
+ this.chargingStations = new Set<string>();
this.webSocketServer = webSocketServer;
}
- abstract handleMessage(command: ProtocolCommand, payload: Record<string, unknown>): Promise<void>;
+ protected buildProtocolMessage(
+ version: ProtocolVersion,
+ command: ProtocolCommand,
+ payload: Record<string, unknown>,
+ ): string {
+ return JSON.stringify([version, command, payload]);
+ }
+
+ abstract handleMessage(version: ProtocolVersion, command: ProtocolCommand, payload: Record<string, unknown>): Promise<void>;
}
}
enum InternalChargingStationWorkerMessageEvents {
+ STARTED = 'started',
+ STOPPED = 'stopped',
PERFORMANCE_STATISTICS = 'performanceStatistics'
}
}
export enum ProtocolCommand {
+ LIST_CHARGING_STATIONS = 'listChargingStations',
START_TRANSACTION = 'startTransaction',
STOP_TRANSACTION = 'stopTransaction',
UNKNOWN = 'unknown',