]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(qav3): cleanup extrema smoothing code
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 14 Jun 2025 18:11:09 +0000 (20:11 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 14 Jun 2025 18:11:09 +0000 (20:11 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index c5ac4b971bd053e943eb5956f84c3ba267a747bc..0c6c3ac25db850406c1176ef26b4c7ca46f7c171 100644 (file)
@@ -1084,14 +1084,19 @@ def train_objective(
     candles_step: int,
     model_training_parameters: dict,
 ) -> float:
+    def calculate_min_extrema(
+        length: int, fit_live_predictions_candles: int, min_extrema: float = 2.0
+    ) -> int:
+        return int(round((length / fit_live_predictions_candles) * min_extrema))
+
     test_ok = True
     test_length = len(X_test)
     if debug:
         n_test_minima: int = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
         n_test_maxima: int = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
         n_test_extrema: int = n_test_minima + n_test_maxima
-        min_test_extrema: int = int(
-            round((test_length / fit_live_predictions_candles) * 2)
+        min_test_extrema: int = calculate_min_extrema(
+            test_length, fit_live_predictions_candles
         )
         logger.info(
             f"{test_length=}, {n_test_minima=}, {n_test_maxima=}, {n_test_extrema=}, {min_test_extrema=}"
@@ -1109,11 +1114,13 @@ def train_objective(
     n_test_minima = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
     n_test_maxima = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
     n_test_extrema = n_test_minima + n_test_maxima
-    min_test_extrema: int = int(round((test_window / fit_live_predictions_candles) * 2))
+    min_test_extrema: int = calculate_min_extrema(
+        test_window, fit_live_predictions_candles
+    )
     if n_test_extrema < min_test_extrema:
         if debug:
             logger.warning(
-                f"Insufficient extrema in test data with {test_window}: {n_test_extrema} < {min_test_extrema}"
+                f"Insufficient extrema in test data with {test_window=}: {n_test_extrema=} < {min_test_extrema=}"
             )
         test_ok = False
     test_weights = test_weights[-test_window:]
@@ -1124,8 +1131,8 @@ def train_objective(
         n_train_minima: int = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
         n_train_maxima: int = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
         n_train_extrema: int = n_train_minima + n_train_maxima
-        min_train_extrema: int = int(
-            round((train_length / fit_live_predictions_candles) * 2)
+        min_train_extrema: int = calculate_min_extrema(
+            train_length, fit_live_predictions_candles
         )
         logger.info(
             f"{train_length=}, {n_train_minima=}, {n_train_maxima=}, {n_train_extrema=}, {min_train_extrema=}"
@@ -1143,13 +1150,13 @@ def train_objective(
     n_train_minima = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
     n_train_maxima = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
     n_train_extrema = n_train_minima + n_train_maxima
-    min_train_extrema: int = int(
-        round((train_window / fit_live_predictions_candles) * 2)
+    min_train_extrema: int = calculate_min_extrema(
+        train_window, fit_live_predictions_candles
     )
     if n_train_extrema < min_train_extrema:
         if debug:
             logger.warning(
-                f"Insufficient extrema in train data with {train_window} : {n_train_extrema} < {min_train_extrema}"
+                f"Insufficient extrema in train data with {train_window=}: {n_train_extrema=} < {min_train_extrema=}"
             )
         train_ok = False
     train_weights = train_weights[-train_window:]
index bb2db50e0bafc902405331719c32ee37b6588b64..1e16873433d1e99e112e27f08c55149135a97c53 100644 (file)
@@ -21,6 +21,7 @@ from Utils import (
     bottom_change_percent,
     calculate_quantile,
     get_zl_ma_fn,
+    zero_phase,
     zigzag,
     ewo,
     non_zero_diff,
@@ -31,7 +32,6 @@ from Utils import (
     get_gaussian_window,
     get_odd_window,
     derive_gaussian_std_from_window,
-    zero_phase_gaussian,
     zlema,
 )
 
@@ -638,7 +638,7 @@ class QuickAdapterV3(IStrategy):
                 logger.error(
                     f"Failed to calculate KAMA for pair {pair}: {str(e)}", exc_info=True
                 )
-        return zlema(label_natr, period=trade_duration_candles).iloc[-1]
+        return label_natr.iloc[-1]
 
     def get_trade_natr(
         self, df: DataFrame, trade: Trade, trade_duration_candles: int
@@ -867,32 +867,51 @@ class QuickAdapterV3(IStrategy):
         self,
         series: Series,
         window: int,
-        std: Optional[float] = None,
     ) -> Series:
         extrema_smoothing = self.freqai_info.get("extrema_smoothing", "gaussian")
-        if std is None:
-            std = derive_gaussian_std_from_window(window)
+        extrema_smoothing_zero_phase = self.freqai_info.get(
+            "extrema_smoothing_zero_phase", True
+        )
+        extrema_smoothing_beta = float(
+            self.freqai_info.get("extrema_smoothing_beta", 10.0)
+        )
+        std = derive_gaussian_std_from_window(window)
+        if debug:
+            logger.info(
+                f"{extrema_smoothing=}, {extrema_smoothing_zero_phase=}, {extrema_smoothing_beta=}, {window=}, {std=}"
+            )
         gaussian_window = get_gaussian_window(std, True)
         odd_window = get_odd_window(window)
         smoothing_methods: dict[str, Series] = {
-            "gaussian": series.rolling(
+            "gaussian": zero_phase(
+                series=series,
+                window=window,
+                win_type="gaussian",
+                std=std,
+                beta=extrema_smoothing_beta,
+            )
+            if extrema_smoothing_zero_phase
+            else series.rolling(
                 window=gaussian_window,
                 win_type="gaussian",
                 center=True,
             ).mean(std=std),
-            "zero_phase_gaussian": zero_phase_gaussian(
-                series=series, window=window, std=std
-            ),
-            "boxcar": series.rolling(
-                window=odd_window, win_type="boxcar", center=True
-            ).mean(),
-            "triang": series.rolling(
-                window=odd_window, win_type="triang", center=True
-            ).mean(),
+            "kaiser": zero_phase(
+                series=series,
+                window=window,
+                win_type="kaiser",
+                std=std,
+                beta=extrema_smoothing_beta,
+            )
+            if extrema_smoothing_zero_phase
+            else series.rolling(
+                window=odd_window,
+                win_type="kaiser",
+                center=True,
+            ).mean(beta=extrema_smoothing_beta),
             "smm": series.rolling(window=odd_window, center=True).median(),
             "sma": series.rolling(window=odd_window, center=True).mean(),
             "ewma": series.ewm(span=window).mean(),
-            "zlewma": zlema(series, period=window),
         }
         return smoothing_methods.get(
             extrema_smoothing,
index f03547ef64d557ebaba571f555d63795c12e4da2..7ceef25092e5df72f4875a602ca8f38abb082cf0 100644 (file)
@@ -5,7 +5,7 @@ import numpy as np
 import pandas as pd
 import scipy as sp
 import talib.abstract as ta
-from typing import Callable, Union
+from typing import Callable, Literal, Union
 from technical import qtpylib
 
 
@@ -45,18 +45,31 @@ def derive_gaussian_std_from_window(window: int) -> float:
 
 
 @lru_cache(maxsize=64)
-def _calculate_gaussian_coeffs(window: int, std: float) -> np.ndarray:
-    gaussian_coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
-    return gaussian_coeffs / np.sum(gaussian_coeffs)
+def _calculate_coeffs(
+    window: int, win_type: Literal["gaussian", "kaiser"], std: float, beta: float
+) -> np.ndarray:
+    if win_type == "gaussian":
+        coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
+    elif win_type == "kaiser":
+        coeffs = sp.signal.windows.kaiser(M=window, beta=beta, sym=True)
+    else:
+        raise ValueError(f"Unknown window type: {win_type}")
+    return coeffs / np.sum(coeffs)
 
 
-def zero_phase_gaussian(series: pd.Series, window: int, std: float) -> pd.Series:
+def zero_phase(
+    series: pd.Series,
+    window: int,
+    win_type: Literal["gaussian", "kaiser"],
+    std: float,
+    beta: float,
+) -> pd.Series:
     if len(series) == 0:
         return series
     if len(series) < window:
         raise ValueError("Series length must be greater than or equal to window size")
     series_values = series.to_numpy()
-    b = _calculate_gaussian_coeffs(window, std)
+    b = _calculate_coeffs(window=window, win_type=win_type, std=std, beta=beta)
     a = 1.0
     filtered_values = sp.signal.filtfilt(b, a, series_values)
     return pd.Series(filtered_values, index=series.index)
@@ -165,7 +178,7 @@ def zlema(series: pd.Series, period: int) -> pd.Series:
     lag = max((period - 1) / 2, 0)
     alpha = 2 / (period + 1)
     zl_series = 2 * series - series.shift(int(lag))
-    return zl_series.ewm(alpha=alpha).mean()
+    return zl_series.ewm(alpha=alpha, adjust=False).mean()
 
 
 def _fractal_dimension(highs: np.ndarray, lows: np.ndarray, period: int) -> float: