fix: ensure OCPP request timeouting cancel it
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 18 Nov 2023 22:06:52 +0000 (23:06 +0100)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 18 Nov 2023 22:06:52 +0000 (23:06 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/charging-station/ocpp/OCPPRequestService.ts
src/utils/Utils.ts
src/utils/index.ts
ui/web/src/composables/UIClient.ts
ui/web/src/composables/Utils.ts

index 69fb503eaf303beee1378157fce587c25393bc9b..c8d4c2fa9a7554ee653c5360ecab95909566c4d5 100644 (file)
@@ -23,13 +23,7 @@ import {
   type ResponseCallback,
   type ResponseType,
 } from '../../types';
-import {
-  Constants,
-  cloneObject,
-  handleSendMessageError,
-  logger,
-  promiseWithTimeout,
-} from '../../utils';
+import { Constants, cloneObject, handleSendMessageError, logger } from '../../utils';
 
 const moduleName = 'OCPPRequestService';
 
@@ -321,145 +315,141 @@ export abstract class OCPPRequestService {
       // eslint-disable-next-line @typescript-eslint/no-this-alias
       const self = this;
       // Send a message through wsConnection
-      return promiseWithTimeout(
-        new Promise<ResponseType>((resolve, reject) => {
-          /**
-           * Function that will receive the request's response
-           *
-           * @param payload -
-           * @param requestPayload -
-           */
-          const responseCallback = (payload: JsonType, requestPayload: JsonType): void => {
-            if (chargingStation.stationInfo?.enableStatistics === true) {
-              chargingStation.performanceStatistics?.addRequestStatistic(
-                commandName,
-                MessageType.CALL_RESULT_MESSAGE,
-              );
-            }
-            // Handle the request's response
-            self.ocppResponseService
-              .responseHandler(
-                chargingStation,
-                commandName as RequestCommand,
-                payload,
-                requestPayload,
-              )
-              .then(() => {
-                resolve(payload);
-              })
-              .catch((error) => {
-                reject(error);
-              })
-              .finally(() => {
-                chargingStation.requests.delete(messageId);
-              });
-          };
-
-          /**
-           * Function that will receive the request's error response
-           *
-           * @param error -
-           * @param requestStatistic -
-           */
-          const errorCallback = (error: OCPPError, requestStatistic = true): void => {
-            if (
-              requestStatistic === true &&
-              chargingStation.stationInfo?.enableStatistics === true
-            ) {
-              chargingStation.performanceStatistics?.addRequestStatistic(
-                commandName,
-                MessageType.CALL_ERROR_MESSAGE,
-              );
-            }
-            logger.error(
-              `${chargingStation.logPrefix()} Error occurred at ${OCPPServiceUtils.getMessageTypeString(
-                messageType,
-              )} command ${commandName} with PDU %j:`,
-              messagePayload,
-              error,
+      return await new Promise<ResponseType>((resolve, reject) => {
+        /**
+         * Function that will receive the request's response
+         *
+         * @param payload -
+         * @param requestPayload -
+         */
+        const responseCallback = (payload: JsonType, requestPayload: JsonType): void => {
+          if (chargingStation.stationInfo?.enableStatistics === true) {
+            chargingStation.performanceStatistics?.addRequestStatistic(
+              commandName,
+              MessageType.CALL_RESULT_MESSAGE,
             );
-            chargingStation.requests.delete(messageId);
-            reject(error);
-          };
+          }
+          // Handle the request's response
+          self.ocppResponseService
+            .responseHandler(
+              chargingStation,
+              commandName as RequestCommand,
+              payload,
+              requestPayload,
+            )
+            .then(() => {
+              resolve(payload);
+            })
+            .catch((error) => {
+              reject(error);
+            })
+            .finally(() => {
+              chargingStation.requests.delete(messageId);
+            });
+        };
 
-          if (chargingStation.stationInfo?.enableStatistics === true) {
-            chargingStation.performanceStatistics?.addRequestStatistic(commandName, messageType);
+        /**
+         * Function that will receive the request's error response
+         *
+         * @param error -
+         * @param requestStatistic -
+         */
+        const errorCallback = (error: OCPPError, requestStatistic = true): void => {
+          if (requestStatistic === true && chargingStation.stationInfo?.enableStatistics === true) {
+            chargingStation.performanceStatistics?.addRequestStatistic(
+              commandName,
+              MessageType.CALL_ERROR_MESSAGE,
+            );
           }
-          const messageToSend = this.buildMessageToSend(
-            chargingStation,
-            messageId,
+          logger.error(
+            `${chargingStation.logPrefix()} Error occurred at ${OCPPServiceUtils.getMessageTypeString(
+              messageType,
+            )} command ${commandName} with PDU %j:`,
             messagePayload,
-            messageType,
-            commandName,
-            responseCallback,
-            errorCallback,
+            error,
           );
-          let sendError = false;
-          // Check if wsConnection opened
-          const wsOpened = chargingStation.isWebSocketConnectionOpened() === true;
-          if (wsOpened) {
-            const beginId = PerformanceStatistics.beginMeasure(commandName);
-            try {
-              chargingStation.wsConnection?.send(messageToSend);
-              logger.debug(
-                `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${OCPPServiceUtils.getMessageTypeString(
-                  messageType,
-                )} payload: ${messageToSend}`,
-              );
-            } catch (error) {
-              logger.error(
-                `${chargingStation.logPrefix()} >> Command '${commandName}' failed to send ${OCPPServiceUtils.getMessageTypeString(
-                  messageType,
-                )} payload: ${messageToSend}:`,
-                error,
+          chargingStation.requests.delete(messageId);
+          reject(error);
+        };
+
+        if (chargingStation.stationInfo?.enableStatistics === true) {
+          chargingStation.performanceStatistics?.addRequestStatistic(commandName, messageType);
+        }
+        const messageToSend = this.buildMessageToSend(
+          chargingStation,
+          messageId,
+          messagePayload,
+          messageType,
+          commandName,
+          responseCallback,
+          errorCallback,
+        );
+        let sendError = false;
+        // Check if wsConnection opened
+        const wsOpened = chargingStation.isWebSocketConnectionOpened() === true;
+        if (wsOpened) {
+          const beginId = PerformanceStatistics.beginMeasure(commandName);
+          try {
+            setTimeout(() => {
+              return errorCallback(
+                new OCPPError(
+                  ErrorType.GENERIC_ERROR,
+                  `Timeout for message id '${messageId}'`,
+                  commandName,
+                  (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
+                ),
+                false,
               );
-              sendError = true;
-            }
-            PerformanceStatistics.endMeasure(commandName, beginId);
-          }
-          const wsClosedOrErrored = !wsOpened || sendError === true;
-          if (wsClosedOrErrored && params?.skipBufferingOnError === false) {
-            // Buffer
-            chargingStation.bufferMessage(messageToSend);
-            // Reject and keep request in the cache
-            return reject(
-              new OCPPError(
-                ErrorType.GENERIC_ERROR,
-                `WebSocket closed or errored for buffered message id '${messageId}' with content '${messageToSend}'`,
-                commandName,
-                (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
-              ),
+            }, OCPPConstants.OCPP_WEBSOCKET_TIMEOUT);
+            chargingStation.wsConnection?.send(messageToSend);
+            logger.debug(
+              `${chargingStation.logPrefix()} >> Command '${commandName}' sent ${OCPPServiceUtils.getMessageTypeString(
+                messageType,
+              )} payload: ${messageToSend}`,
+            );
+          } catch (error) {
+            logger.error(
+              `${chargingStation.logPrefix()} >> Command '${commandName}' failed to send ${OCPPServiceUtils.getMessageTypeString(
+                messageType,
+              )} payload: ${messageToSend}:`,
+              error,
             );
-          } else if (wsClosedOrErrored) {
-            const ocppError = new OCPPError(
+            sendError = true;
+          }
+          PerformanceStatistics.endMeasure(commandName, beginId);
+        }
+        const wsClosedOrErrored = !wsOpened || sendError === true;
+        if (wsClosedOrErrored && params?.skipBufferingOnError === false) {
+          // Buffer
+          chargingStation.bufferMessage(messageToSend);
+          // Reject and keep request in the cache
+          return reject(
+            new OCPPError(
               ErrorType.GENERIC_ERROR,
-              `WebSocket closed or errored for non buffered message id '${messageId}' with content '${messageToSend}'`,
+              `WebSocket closed or errored for buffered message id '${messageId}' with content '${messageToSend}'`,
               commandName,
               (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
-            );
-            // Reject response
-            if (messageType !== MessageType.CALL_MESSAGE) {
-              return reject(ocppError);
-            }
-            // Reject and remove request from the cache
-            return errorCallback(ocppError, false);
-          }
-          // Resolve response
+            ),
+          );
+        } else if (wsClosedOrErrored) {
+          const ocppError = new OCPPError(
+            ErrorType.GENERIC_ERROR,
+            `WebSocket closed or errored for non buffered message id '${messageId}' with content '${messageToSend}'`,
+            commandName,
+            (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
+          );
+          // Reject response
           if (messageType !== MessageType.CALL_MESSAGE) {
-            return resolve(messagePayload);
+            return reject(ocppError);
           }
-        }),
-        OCPPConstants.OCPP_WEBSOCKET_TIMEOUT,
-        new OCPPError(
-          ErrorType.GENERIC_ERROR,
-          `Timeout for message id '${messageId}'`,
-          commandName,
-          (messagePayload as JsonObject)?.details ?? Constants.EMPTY_FROZEN_OBJECT,
-        ),
-        () => {
-          messageType === MessageType.CALL_MESSAGE && chargingStation.requests.delete(messageId);
-        },
-      );
+          // Reject and remove request from the cache
+          return errorCallback(ocppError, false);
+        }
+        // Resolve response
+        if (messageType !== MessageType.CALL_MESSAGE) {
+          return resolve(messagePayload);
+        }
+      });
     }
     throw new OCPPError(
       ErrorType.SECURITY_ERROR,
index add232ac159f8976b557efbdb56ac37f6685652b..efdffc2f2bdc5b59b12acaf55a425a200982b86f 100644 (file)
@@ -1,6 +1,5 @@
 import { randomBytes, randomInt, randomUUID, webcrypto } from 'node:crypto';
 import { env, nextTick } from 'node:process';
-import { inspect } from 'node:util';
 
 import {
   formatDuration,
@@ -331,33 +330,6 @@ export const exponentialDelay = (retryNumber = 0, delayFactor = 100): number =>
   return delay + randomSum;
 };
 
-const isPromisePending = (promise: Promise<unknown>): boolean => {
-  return inspect(promise).includes('pending');
-};
-
-export const promiseWithTimeout = async <T>(
-  promise: Promise<T>,
-  timeoutMs: number,
-  timeoutError: Error,
-  timeoutCallback: () => void = () => {
-    /* This is intentional */
-  },
-): Promise<T> => {
-  // Creates a timeout promise that rejects in timeout milliseconds
-  const timeoutPromise = new Promise<never>((_, reject) => {
-    setTimeout(() => {
-      if (isPromisePending(promise)) {
-        timeoutCallback();
-        // FIXME: The original promise shall be canceled
-      }
-      reject(timeoutError);
-    }, timeoutMs);
-  });
-
-  // Returns a race between timeout promise and the passed promise
-  return Promise.race<T>([promise, timeoutPromise]);
-};
-
 /**
  * Generates a cryptographically secure random number in the [0,1[ range
  *
index f3feeb66d2bfd842d8fad04f9379fce44904f85f..7c876834ed260ee32aad32656edab3d7e5fac3f5 100644 (file)
@@ -52,7 +52,6 @@ export {
   max,
   min,
   once,
-  promiseWithTimeout,
   roundTo,
   secureRandom,
   sleep,
index bbb4a4499b233a6aead4703eaf1b3de93004e32e..c3107d6310401f91b18056e6691e884e04aa2053 100644 (file)
@@ -1,4 +1,3 @@
-import { promiseWithTimeout } from './Utils';
 import {
   ProcedureName,
   type ProtocolResponse,
@@ -146,28 +145,25 @@ export class UIClient {
     data: RequestPayload,
   ): Promise<ResponsePayload> {
     let uuid: string;
-    return promiseWithTimeout(
-      new Promise<ResponsePayload>((resolve, reject) => {
-        uuid = crypto.randomUUID();
-        const msg = JSON.stringify([uuid, command, data]);
-
-        if (this.ws.readyState !== WebSocket.OPEN) {
-          this.openWS();
-        }
-        if (this.ws.readyState === WebSocket.OPEN) {
-          this.ws.send(msg);
-        } else {
-          throw new Error(`Send request '${command}' message: connection not opened`);
-        }
-
-        this.setResponseHandler(uuid, command, resolve, reject);
-      }),
-      120 * 1000,
-      Error(`Send request '${command}' message timeout`),
-      () => {
-        this.responseHandlers.delete(uuid);
-      },
-    );
+    return await new Promise<ResponsePayload>((resolve, reject) => {
+      uuid = crypto.randomUUID();
+      const msg = JSON.stringify([uuid, command, data]);
+
+      if (this.ws.readyState !== WebSocket.OPEN) {
+        this.openWS();
+      }
+      if (this.ws.readyState === WebSocket.OPEN) {
+        setTimeout(() => {
+          this.deleteResponseHandler(uuid);
+          return reject(new Error(`Send request '${command}' message timeout`));
+        }, 60 * 1000);
+        this.ws.send(msg);
+      } else {
+        throw new Error(`Send request '${command}' message: connection not opened`);
+      }
+
+      this.setResponseHandler(uuid, command, resolve, reject);
+    });
   }
 
   private responseHandler(messageEvent: MessageEvent<string>): void {
index 9e1277a84beaeea04d35c0ec4bcc6fd749a8b79c..07848f36d41b8a8f285ae817b117ddaf6bdb2178 100644 (file)
@@ -18,28 +18,6 @@ export const ifUndefined = <T>(value: T | undefined, isValue: T): T => {
 //   if (isIterable(obj) === false) cb();
 // };
 
-export const promiseWithTimeout = <T>(
-  promise: Promise<T>,
-  timeoutMs: number,
-  timeoutError: Error,
-  timeoutCallback: () => void = () => {
-    /* This is intentional */
-  },
-): Promise<T> => {
-  // Create a timeout promise that rejects in timeout milliseconds
-  const timeoutPromise = new Promise<never>((_, reject) => {
-    setTimeout(() => {
-      // FIXME: The original promise state shall be checked
-      timeoutCallback();
-      // FIXME: The original promise shall be canceled
-      reject(timeoutError);
-    }, timeoutMs);
-  });
-
-  // Returns a race between timeout promise and the passed promise
-  return Promise.race<T>([promise, timeoutPromise]);
-};
-
 // export const compose = <T>(...fns: ((arg: T) => T)[]): ((x: T) => T) => {
 //   return (x: T) => fns.reduceRight((y, fn) => fn(y), x);
 // };