]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
fix(qav3): fix FRAMA implementation
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 25 Mar 2025 10:50:04 +0000 (11:50 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 25 Mar 2025 10:50:04 +0000 (11:50 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/strategies/Utils.py

index 7f345f25d788ab28b420a67fcca8a1529beefa4f..efaa78fda63bd849de0a84145aa530e29ec8f2a7 100644 (file)
@@ -140,74 +140,65 @@ def get_ma_fn(mamode: str) -> callable:
     return mamodes.get(mamode, mamodes["sma"])
 
 
-def fractal_dimension(
-    prices_array: np.ndarray, period: int, normalize: bool = False
-) -> float:
-    """
-    Calculate fractal dimension of a price window, with optional normalization.
-
-    Args:
-        window: Array of prices for the current window.
-        period: Window size (must be even).
-        normalize: If True, normalize HL values by their window lengths.
-
-    Returns:
-        Fractal dimension (D) clipped between 1.0 and 2.0.
-    """
+def _fractal_dimension(high: np.ndarray, low: np.ndarray, period: int) -> float:
+    """Original fractal dimension computation implementation per Ehlers' paper."""
     if period % 2 != 0:
-        raise ValueError("FRAMA period must be even")
+        raise ValueError("period must be even")
 
     half_period = period // 2
-    if half_period < 1 or len(prices_array) < period:
-        return 1.0
-    prices_first_half = prices_array[:half_period]
-    prices_second_half = prices_array[half_period:]
 
-    HL1 = np.max(prices_first_half) - np.min(prices_first_half)
-    HL2 = np.max(prices_second_half) - np.min(prices_second_half)
-    HL3 = np.max(prices_array) - np.min(prices_array)
+    H1 = np.max(high[:half_period])
+    L1 = np.min(low[:half_period])
 
-    if normalize:
-        HL1 /= half_period
-        HL2 /= half_period
-        HL3 /= period
+    H2 = np.max(high[half_period:])
+    L2 = np.min(low[half_period:])
+
+    H3 = np.max(high)
+    L3 = np.min(low)
 
-    if HL1 + HL2 == 0 or HL3 == 0:
+    HL1 = H1 - L1
+    HL2 = H2 - L2
+    HL3 = H3 - L3
+
+    if (HL1 + HL2) == 0 or HL3 == 0:
         return 1.0
 
-    D = (np.log(HL1 + HL2) - np.log(HL3)) / np.log(2)
+    D = (np.log(HL1 + HL2) - np.log(HL3)) / np.log(2) + 1
     return np.clip(D, 1.0, 2.0)
 
 
-def frama(series: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series:
+def frama(df: pd.DataFrame, period: int = 16) -> pd.Series:
     """
-    Calculate FRAMA with optional normalization.
-
-    Args:
-        series: Pandas Series of prices.
-        period: Lookback window (default=16).
-        normalize: Enable range normalization (default=False).
-
-    Returns:
-        FRAMA values as a Pandas Series.
+    Original FRAMA implementation per Ehlers' paper.
     """
     if period % 2 != 0:
-        raise ValueError("FRAMA period must be even")
+        raise ValueError("period must be even")
 
-    frama = pd.Series(np.nan, index=series.index)
+    high = df["high"]
+    low = df["low"]
+    close = df["close"]
 
-    fractal_d = series.rolling(window=period, min_periods=period).apply(
-        lambda arr: fractal_dimension(arr, period, normalize), raw=True
-    )
-    alpha = np.exp(-4.6 * (fractal_d - 1))
+    fd = pd.Series(np.nan, index=close.index)
+    for i in range(period, len(close)):
+        window_high = high.iloc[i - period : i]
+        window_low = low.iloc[i - period : i]
+
+        if len(window_high) != period:
+            continue
+
+        fd.iloc[i] = _fractal_dimension(window_high.values, window_low.values, period)
+
+    alpha = np.exp(-4.6 * (fd - 1))
+    alpha = np.clip(alpha, 0.01, 1)
 
-    for i in range(period - 1, len(series)):
-        if np.isnan(frama.iloc[i - 1]):
-            frama.iloc[i] = series.iloc[i]
-        else:
-            frama.iloc[i] = (
-                alpha.iloc[i] * series.iloc[i] + (1 - alpha.iloc[i]) * frama.iloc[i - 1]
-            )
+    frama = pd.Series(np.nan, index=close.index)
+    frama.iloc[period - 1] = close.iloc[:period].mean()
+    for i in range(period, len(close)):
+        if pd.isna(frama.iloc[i - 1]) or pd.isna(alpha.iloc[i]):
+            continue
+        frama.iloc[i] = (
+            alpha.iloc[i] * close.iloc[i] + (1 - alpha.iloc[i]) * frama.iloc[i - 1]
+        )
 
     return frama
 
@@ -225,7 +216,7 @@ def smma(series: pd.Series, period: int, zero_lag=False, offset=0) -> pd.Series:
 
     if zero_lag:
         series = zero_lag_series(series, timeperiod=period)
-    smma = pd.Series(index=series.index, dtype="float64")
+    smma = pd.Series(np.nan, index=series.index)
     smma.iloc[period - 1] = series.iloc[:period].mean()
 
     for i in range(period, len(series)):