From 1b2acf4e9c00cc7272ec7769b4e82113d61f64fb Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 18 Nov 2023 23:06:52 +0100 Subject: [PATCH] fix: ensure OCPP request timeouting cancel it MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../ocpp/OCPPRequestService.ts | 258 +++++++++--------- src/utils/Utils.ts | 28 -- src/utils/index.ts | 1 - ui/web/src/composables/UIClient.ts | 42 ++- ui/web/src/composables/Utils.ts | 22 -- 5 files changed, 143 insertions(+), 208 deletions(-) diff --git a/src/charging-station/ocpp/OCPPRequestService.ts b/src/charging-station/ocpp/OCPPRequestService.ts index 69fb503e..c8d4c2fa 100644 --- a/src/charging-station/ocpp/OCPPRequestService.ts +++ b/src/charging-station/ocpp/OCPPRequestService.ts @@ -23,13 +23,7 @@ import { type ResponseCallback, type ResponseType, } from '../../types'; -import { - Constants, - cloneObject, - handleSendMessageError, - logger, - promiseWithTimeout, -} from '../../utils'; +import { Constants, cloneObject, handleSendMessageError, logger } from '../../utils'; const moduleName = 'OCPPRequestService'; @@ -321,145 +315,141 @@ export abstract class OCPPRequestService { // eslint-disable-next-line @typescript-eslint/no-this-alias const self = this; // Send a message through wsConnection - return promiseWithTimeout( - new Promise((resolve, reject) => { - /** - * Function that will receive the request's response - * - * @param payload - - * @param requestPayload - - */ - const responseCallback = (payload: JsonType, requestPayload: JsonType): void => { - if (chargingStation.stationInfo?.enableStatistics === true) { - chargingStation.performanceStatistics?.addRequestStatistic( - commandName, - MessageType.CALL_RESULT_MESSAGE, - ); - } - // Handle the request's response - self.ocppResponseService - .responseHandler( - chargingStation, - commandName as RequestCommand, - payload, - requestPayload, - ) - .then(() => { - resolve(payload); - }) - .catch((error) => { - reject(error); - }) - .finally(() => { - chargingStation.requests.delete(messageId); - }); - }; - - /** - * Function that will receive the request's error response - * - * @param error - - * @param requestStatistic - - */ - const errorCallback = (error: OCPPError, requestStatistic = true): void => { - if ( - requestStatistic === true && - chargingStation.stationInfo?.enableStatistics === true - ) { - chargingStation.performanceStatistics?.addRequestStatistic( - commandName, - MessageType.CALL_ERROR_MESSAGE, - ); - } - logger.error( - `${chargingStation.logPrefix()} Error occurred at ${OCPPServiceUtils.getMessageTypeString( - messageType, - )} command ${commandName} with PDU %j:`, - messagePayload, - error, + return await new Promise((resolve, reject) => { + /** + * Function that will receive the request's response + * + * @param payload - + * @param requestPayload - + */ + const responseCallback = (payload: JsonType, requestPayload: JsonType): void => { + if (chargingStation.stationInfo?.enableStatistics === true) { + chargingStation.performanceStatistics?.addRequestStatistic( + commandName, + MessageType.CALL_RESULT_MESSAGE, ); - chargingStation.requests.delete(messageId); - reject(error); - }; + } + // Handle the request's response + self.ocppResponseService + .responseHandler( + chargingStation, + commandName as RequestCommand, + payload, + requestPayload, + ) + .then(() => { + resolve(payload); + }) + .catch((error) => { + reject(error); + }) + .finally(() => { + chargingStation.requests.delete(messageId); + }); + }; - if (chargingStation.stationInfo?.enableStatistics === true) { - chargingStation.performanceStatistics?.addRequestStatistic(commandName, messageType); + /** + * Function that will receive the request's error response + * + * @param error - + * @param requestStatistic - + */ + const errorCallback = (error: OCPPError, requestStatistic = true): void => { + if (requestStatistic === true && chargingStation.stationInfo?.enableStatistics === true) { + chargingStation.performanceStatistics?.addRequestStatistic( + commandName, + MessageType.CALL_ERROR_MESSAGE, + ); } - const messageToSend = this.buildMessageToSend( - chargingStation, - messageId, + logger.error( + `${chargingStation.logPrefix()} Error occurred at ${OCPPServiceUtils.getMessageTypeString( + messageType, + )} command ${commandName} with PDU %j:`, messagePayload, - messageType, - commandName, - responseCallback, - errorCallback, + error, ); - let sendError = false; - // Check if wsConnection opened - const wsOpened = chargingStation.isWebSocketConnectionOpened() === true; - if (wsOpened) { - const beginId = PerformanceStatistics.beginMeasure(commandName); - try { - chargingStation.wsConnection?.send(messageToSend); - logger.debug( - `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${OCPPServiceUtils.getMessageTypeString( - messageType, - )} payload: ${messageToSend}`, - ); - } catch (error) { - logger.error( - `${chargingStation.logPrefix()} >> Command '${commandName}' failed to send ${OCPPServiceUtils.getMessageTypeString( - messageType, - )} payload: ${messageToSend}:`, - error, + chargingStation.requests.delete(messageId); + reject(error); + }; + + if (chargingStation.stationInfo?.enableStatistics === true) { + chargingStation.performanceStatistics?.addRequestStatistic(commandName, messageType); + } + const messageToSend = this.buildMessageToSend( + chargingStation, + messageId, + messagePayload, + messageType, + commandName, + responseCallback, + errorCallback, + ); + let sendError = false; + // Check if wsConnection opened + const wsOpened = chargingStation.isWebSocketConnectionOpened() === true; + if (wsOpened) { + const beginId = PerformanceStatistics.beginMeasure(commandName); + try { + setTimeout(() => { + return errorCallback( + new OCPPError( + ErrorType.GENERIC_ERROR, + `Timeout for message id '${messageId}'`, + commandName, + (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT, + ), + false, ); - sendError = true; - } - PerformanceStatistics.endMeasure(commandName, beginId); - } - const wsClosedOrErrored = !wsOpened || sendError === true; - if (wsClosedOrErrored && params?.skipBufferingOnError === false) { - // Buffer - chargingStation.bufferMessage(messageToSend); - // Reject and keep request in the cache - return reject( - new OCPPError( - ErrorType.GENERIC_ERROR, - `WebSocket closed or errored for buffered message id '${messageId}' with content '${messageToSend}'`, - commandName, - (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT, - ), + }, OCPPConstants.OCPP_WEBSOCKET_TIMEOUT); + chargingStation.wsConnection?.send(messageToSend); + logger.debug( + `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${OCPPServiceUtils.getMessageTypeString( + messageType, + )} payload: ${messageToSend}`, + ); + } catch (error) { + logger.error( + `${chargingStation.logPrefix()} >> Command '${commandName}' failed to send ${OCPPServiceUtils.getMessageTypeString( + messageType, + )} payload: ${messageToSend}:`, + error, ); - } else if (wsClosedOrErrored) { - const ocppError = new OCPPError( + sendError = true; + } + PerformanceStatistics.endMeasure(commandName, beginId); + } + const wsClosedOrErrored = !wsOpened || sendError === true; + if (wsClosedOrErrored && params?.skipBufferingOnError === false) { + // Buffer + chargingStation.bufferMessage(messageToSend); + // Reject and keep request in the cache + return reject( + new OCPPError( ErrorType.GENERIC_ERROR, - `WebSocket closed or errored for non buffered message id '${messageId}' with content '${messageToSend}'`, + `WebSocket closed or errored for buffered message id '${messageId}' with content '${messageToSend}'`, commandName, (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT, - ); - // Reject response - if (messageType !== MessageType.CALL_MESSAGE) { - return reject(ocppError); - } - // Reject and remove request from the cache - return errorCallback(ocppError, false); - } - // Resolve response + ), + ); + } else if (wsClosedOrErrored) { + const ocppError = new OCPPError( + ErrorType.GENERIC_ERROR, + `WebSocket closed or errored for non buffered message id '${messageId}' with content '${messageToSend}'`, + commandName, + (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT, + ); + // Reject response if (messageType !== MessageType.CALL_MESSAGE) { - return resolve(messagePayload); + return reject(ocppError); } - }), - OCPPConstants.OCPP_WEBSOCKET_TIMEOUT, - new OCPPError( - ErrorType.GENERIC_ERROR, - `Timeout for message id '${messageId}'`, - commandName, - (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT, - ), - () => { - messageType === MessageType.CALL_MESSAGE && chargingStation.requests.delete(messageId); - }, - ); + // Reject and remove request from the cache + return errorCallback(ocppError, false); + } + // Resolve response + if (messageType !== MessageType.CALL_MESSAGE) { + return resolve(messagePayload); + } + }); } throw new OCPPError( ErrorType.SECURITY_ERROR, diff --git a/src/utils/Utils.ts b/src/utils/Utils.ts index add232ac..efdffc2f 100644 --- a/src/utils/Utils.ts +++ b/src/utils/Utils.ts @@ -1,6 +1,5 @@ import { randomBytes, randomInt, randomUUID, webcrypto } from 'node:crypto'; import { env, nextTick } from 'node:process'; -import { inspect } from 'node:util'; import { formatDuration, @@ -331,33 +330,6 @@ export const exponentialDelay = (retryNumber = 0, delayFactor = 100): number => return delay + randomSum; }; -const isPromisePending = (promise: Promise): boolean => { - return inspect(promise).includes('pending'); -}; - -export const promiseWithTimeout = async ( - promise: Promise, - timeoutMs: number, - timeoutError: Error, - timeoutCallback: () => void = () => { - /* This is intentional */ - }, -): Promise => { - // Creates a timeout promise that rejects in timeout milliseconds - const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { - if (isPromisePending(promise)) { - timeoutCallback(); - // FIXME: The original promise shall be canceled - } - reject(timeoutError); - }, timeoutMs); - }); - - // Returns a race between timeout promise and the passed promise - return Promise.race([promise, timeoutPromise]); -}; - /** * Generates a cryptographically secure random number in the [0,1[ range * diff --git a/src/utils/index.ts b/src/utils/index.ts index f3feeb66..7c876834 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -52,7 +52,6 @@ export { max, min, once, - promiseWithTimeout, roundTo, secureRandom, sleep, diff --git a/ui/web/src/composables/UIClient.ts b/ui/web/src/composables/UIClient.ts index bbb4a449..c3107d63 100644 --- a/ui/web/src/composables/UIClient.ts +++ b/ui/web/src/composables/UIClient.ts @@ -1,4 +1,3 @@ -import { promiseWithTimeout } from './Utils'; import { ProcedureName, type ProtocolResponse, @@ -146,28 +145,25 @@ export class UIClient { data: RequestPayload, ): Promise { let uuid: string; - return promiseWithTimeout( - new Promise((resolve, reject) => { - uuid = crypto.randomUUID(); - const msg = JSON.stringify([uuid, command, data]); - - if (this.ws.readyState !== WebSocket.OPEN) { - this.openWS(); - } - if (this.ws.readyState === WebSocket.OPEN) { - this.ws.send(msg); - } else { - throw new Error(`Send request '${command}' message: connection not opened`); - } - - this.setResponseHandler(uuid, command, resolve, reject); - }), - 120 * 1000, - Error(`Send request '${command}' message timeout`), - () => { - this.responseHandlers.delete(uuid); - }, - ); + return await new Promise((resolve, reject) => { + uuid = crypto.randomUUID(); + const msg = JSON.stringify([uuid, command, data]); + + if (this.ws.readyState !== WebSocket.OPEN) { + this.openWS(); + } + if (this.ws.readyState === WebSocket.OPEN) { + setTimeout(() => { + this.deleteResponseHandler(uuid); + return reject(new Error(`Send request '${command}' message timeout`)); + }, 60 * 1000); + this.ws.send(msg); + } else { + throw new Error(`Send request '${command}' message: connection not opened`); + } + + this.setResponseHandler(uuid, command, resolve, reject); + }); } private responseHandler(messageEvent: MessageEvent): void { diff --git a/ui/web/src/composables/Utils.ts b/ui/web/src/composables/Utils.ts index 9e1277a8..07848f36 100644 --- a/ui/web/src/composables/Utils.ts +++ b/ui/web/src/composables/Utils.ts @@ -18,28 +18,6 @@ export const ifUndefined = (value: T | undefined, isValue: T): T => { // if (isIterable(obj) === false) cb(); // }; -export const promiseWithTimeout = ( - promise: Promise, - timeoutMs: number, - timeoutError: Error, - timeoutCallback: () => void = () => { - /* This is intentional */ - }, -): Promise => { - // Create a timeout promise that rejects in timeout milliseconds - const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { - // FIXME: The original promise state shall be checked - timeoutCallback(); - // FIXME: The original promise shall be canceled - reject(timeoutError); - }, timeoutMs); - }); - - // Returns a race between timeout promise and the passed promise - return Promise.race([promise, timeoutPromise]); -}; - // export const compose = (...fns: ((arg: T) => T)[]): ((x: T) => T) => { // return (x: T) => fns.reduceRight((y, fn) => fn(y), x); // }; -- 2.34.1