private bootNotificationResponse!: BootNotificationResponse | null;
private connectorsConfigurationHash!: string;
private ocppIncomingRequestService!: OCPPIncomingRequestService;
- private readonly messageQueue: string[];
+ private readonly messageBuffer: Set<string>;
private wsConnectionUrl!: URL;
private wsConnectionRestarted: boolean;
private stopped: boolean;
this.autoReconnectRetryCount = 0;
this.requests = new Map<string, CachedRequest>();
- this.messageQueue = new Array<string>();
+ this.messageBuffer = new Set<string>();
this.authorizedTags = this.getAuthorizedTags();
}
this.stopMeterValues(connectorId);
}
- public addToMessageQueue(message: string): void {
- let dups = false;
- // Handle dups in message queue
- for (const bufferedMessage of this.messageQueue) {
- // Message already in the queue
- if (message === bufferedMessage) {
- dups = true;
- break;
- }
- }
- if (!dups) {
- // Queue message
- this.messageQueue.push(message);
- }
+ public bufferMessage(message: string): void {
+ this.messageBuffer.add(message);
}
- private flushMessageQueue() {
- if (!Utils.isEmptyArray(this.messageQueue)) {
- this.messageQueue.forEach((message, index) => {
- this.messageQueue.splice(index, 1);
+ private flushMessageBuffer() {
+ if (this.messageBuffer.size > 0) {
+ this.messageBuffer.forEach((message) => {
// TODO: evaluate the need to track performance
this.wsConnection.send(message);
+ this.messageBuffer.delete(message);
});
}
}
await this.startMessageSequence();
this.stopped && (this.stopped = false);
if (this.wsConnectionRestarted && this.isWebSocketConnectionOpened()) {
- this.flushMessageQueue();
+ this.flushMessageBuffer();
}
} else {
logger.error(`${this.logPrefix()} Registration failure: max retries reached (${this.getRegistrationMaxRetries()}) or retry disabled (${this.getRegistrationMaxRetries()})`);
PerformanceStatistics.endMeasure(commandName, beginId);
} else if (!skipBufferingOnError) {
// Buffer it
- this.chargingStation.addToMessageQueue(messageToSend);
+ this.chargingStation.bufferMessage(messageToSend);
const ocppError = new OCPPError(ErrorType.GENERIC_ERROR, `WebSocket closed for buffered message id '${messageId}' with content '${messageToSend}'`, commandParams?.details ?? {});
if (messageType === MessageType.CALL_MESSAGE) {
// Reject it but keep the request in the cache