From: Jérôme Benoit Date: Tue, 25 Mar 2025 10:50:04 +0000 (+0100) Subject: fix(qav3): fix FRAMA implementation X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=5e29902eb00b285682998375bbf848fd9ab97ef5;p=freqai-strategies.git fix(qav3): fix FRAMA implementation Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 7f345f2..efaa78f 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -140,74 +140,65 @@ def get_ma_fn(mamode: str) -> callable: return mamodes.get(mamode, mamodes["sma"]) -def fractal_dimension( - prices_array: np.ndarray, period: int, normalize: bool = False -) -> float: - """ - Calculate fractal dimension of a price window, with optional normalization. - - Args: - window: Array of prices for the current window. - period: Window size (must be even). - normalize: If True, normalize HL values by their window lengths. - - Returns: - Fractal dimension (D) clipped between 1.0 and 2.0. - """ +def _fractal_dimension(high: np.ndarray, low: np.ndarray, period: int) -> float: + """Original fractal dimension computation implementation per Ehlers' paper.""" if period % 2 != 0: - raise ValueError("FRAMA period must be even") + raise ValueError("period must be even") half_period = period // 2 - if half_period < 1 or len(prices_array) < period: - return 1.0 - prices_first_half = prices_array[:half_period] - prices_second_half = prices_array[half_period:] - HL1 = np.max(prices_first_half) - np.min(prices_first_half) - HL2 = np.max(prices_second_half) - np.min(prices_second_half) - HL3 = np.max(prices_array) - np.min(prices_array) + H1 = np.max(high[:half_period]) + L1 = np.min(low[:half_period]) - if normalize: - HL1 /= half_period - HL2 /= half_period - HL3 /= period + H2 = np.max(high[half_period:]) + L2 = np.min(low[half_period:]) + + H3 = np.max(high) + L3 = np.min(low) - if HL1 + HL2 == 0 or HL3 == 0: + HL1 = H1 - L1 + HL2 = H2 - L2 + HL3 = H3 - L3 + + if (HL1 + HL2) == 0 or HL3 == 0: return 1.0 - D = (np.log(HL1 + HL2) - np.log(HL3)) / np.log(2) + D = (np.log(HL1 + HL2) - np.log(HL3)) / np.log(2) + 1 return np.clip(D, 1.0, 2.0) -def frama(series: pd.Series, period: int = 16, normalize: bool = False) -> pd.Series: +def frama(df: pd.DataFrame, period: int = 16) -> pd.Series: """ - Calculate FRAMA with optional normalization. - - Args: - series: Pandas Series of prices. - period: Lookback window (default=16). - normalize: Enable range normalization (default=False). - - Returns: - FRAMA values as a Pandas Series. + Original FRAMA implementation per Ehlers' paper. """ if period % 2 != 0: - raise ValueError("FRAMA period must be even") + raise ValueError("period must be even") - frama = pd.Series(np.nan, index=series.index) + high = df["high"] + low = df["low"] + close = df["close"] - fractal_d = series.rolling(window=period, min_periods=period).apply( - lambda arr: fractal_dimension(arr, period, normalize), raw=True - ) - alpha = np.exp(-4.6 * (fractal_d - 1)) + fd = pd.Series(np.nan, index=close.index) + for i in range(period, len(close)): + window_high = high.iloc[i - period : i] + window_low = low.iloc[i - period : i] + + if len(window_high) != period: + continue + + fd.iloc[i] = _fractal_dimension(window_high.values, window_low.values, period) + + alpha = np.exp(-4.6 * (fd - 1)) + alpha = np.clip(alpha, 0.01, 1) - for i in range(period - 1, len(series)): - if np.isnan(frama.iloc[i - 1]): - frama.iloc[i] = series.iloc[i] - else: - frama.iloc[i] = ( - alpha.iloc[i] * series.iloc[i] + (1 - alpha.iloc[i]) * frama.iloc[i - 1] - ) + frama = pd.Series(np.nan, index=close.index) + frama.iloc[period - 1] = close.iloc[:period].mean() + for i in range(period, len(close)): + if pd.isna(frama.iloc[i - 1]) or pd.isna(alpha.iloc[i]): + continue + frama.iloc[i] = ( + alpha.iloc[i] * close.iloc[i] + (1 - alpha.iloc[i]) * frama.iloc[i - 1] + ) return frama @@ -225,7 +216,7 @@ def smma(series: pd.Series, period: int, zero_lag=False, offset=0) -> pd.Series: if zero_lag: series = zero_lag_series(series, timeperiod=period) - smma = pd.Series(index=series.index, dtype="float64") + smma = pd.Series(np.nan, index=series.index) smma.iloc[period - 1] = series.iloc[:period].mean() for i in range(period, len(series)):