From 4278c64cfbc787e7ea70f36e77ef27b0c0608688 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 14 Jun 2025 20:11:09 +0200 Subject: [PATCH] refactor(qav3): cleanup extrema smoothing code MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 25 +++++---- .../user_data/strategies/QuickAdapterV3.py | 51 +++++++++++++------ quickadapter/user_data/strategies/Utils.py | 27 +++++++--- 3 files changed, 71 insertions(+), 32 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index c5ac4b9..0c6c3ac 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -1084,14 +1084,19 @@ def train_objective( candles_step: int, model_training_parameters: dict, ) -> float: + def calculate_min_extrema( + length: int, fit_live_predictions_candles: int, min_extrema: float = 2.0 + ) -> int: + return int(round((length / fit_live_predictions_candles) * min_extrema)) + test_ok = True test_length = len(X_test) if debug: n_test_minima: int = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size n_test_maxima: int = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size n_test_extrema: int = n_test_minima + n_test_maxima - min_test_extrema: int = int( - round((test_length / fit_live_predictions_candles) * 2) + min_test_extrema: int = calculate_min_extrema( + test_length, fit_live_predictions_candles ) logger.info( f"{test_length=}, {n_test_minima=}, {n_test_maxima=}, {n_test_extrema=}, {min_test_extrema=}" @@ -1109,11 +1114,13 @@ def train_objective( n_test_minima = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size n_test_maxima = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size n_test_extrema = n_test_minima + n_test_maxima - min_test_extrema: int = int(round((test_window / fit_live_predictions_candles) * 2)) + min_test_extrema: int = calculate_min_extrema( + test_window, fit_live_predictions_candles + ) if n_test_extrema < min_test_extrema: if debug: logger.warning( - f"Insufficient extrema in test data with {test_window}: {n_test_extrema} < {min_test_extrema}" + f"Insufficient extrema in test data with {test_window=}: {n_test_extrema=} < {min_test_extrema=}" ) test_ok = False test_weights = test_weights[-test_window:] @@ -1124,8 +1131,8 @@ def train_objective( n_train_minima: int = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size n_train_maxima: int = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size n_train_extrema: int = n_train_minima + n_train_maxima - min_train_extrema: int = int( - round((train_length / fit_live_predictions_candles) * 2) + min_train_extrema: int = calculate_min_extrema( + train_length, fit_live_predictions_candles ) logger.info( f"{train_length=}, {n_train_minima=}, {n_train_maxima=}, {n_train_extrema=}, {min_train_extrema=}" @@ -1143,13 +1150,13 @@ def train_objective( n_train_minima = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size n_train_maxima = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size n_train_extrema = n_train_minima + n_train_maxima - min_train_extrema: int = int( - round((train_window / fit_live_predictions_candles) * 2) + min_train_extrema: int = calculate_min_extrema( + train_window, fit_live_predictions_candles ) if n_train_extrema < min_train_extrema: if debug: logger.warning( - f"Insufficient extrema in train data with {train_window} : {n_train_extrema} < {min_train_extrema}" + f"Insufficient extrema in train data with {train_window=}: {n_train_extrema=} < {min_train_extrema=}" ) train_ok = False train_weights = train_weights[-train_window:] diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index bb2db50..1e16873 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -21,6 +21,7 @@ from Utils import ( bottom_change_percent, calculate_quantile, get_zl_ma_fn, + zero_phase, zigzag, ewo, non_zero_diff, @@ -31,7 +32,6 @@ from Utils import ( get_gaussian_window, get_odd_window, derive_gaussian_std_from_window, - zero_phase_gaussian, zlema, ) @@ -638,7 +638,7 @@ class QuickAdapterV3(IStrategy): logger.error( f"Failed to calculate KAMA for pair {pair}: {str(e)}", exc_info=True ) - return zlema(label_natr, period=trade_duration_candles).iloc[-1] + return label_natr.iloc[-1] def get_trade_natr( self, df: DataFrame, trade: Trade, trade_duration_candles: int @@ -867,32 +867,51 @@ class QuickAdapterV3(IStrategy): self, series: Series, window: int, - std: Optional[float] = None, ) -> Series: extrema_smoothing = self.freqai_info.get("extrema_smoothing", "gaussian") - if std is None: - std = derive_gaussian_std_from_window(window) + extrema_smoothing_zero_phase = self.freqai_info.get( + "extrema_smoothing_zero_phase", True + ) + extrema_smoothing_beta = float( + self.freqai_info.get("extrema_smoothing_beta", 10.0) + ) + std = derive_gaussian_std_from_window(window) + if debug: + logger.info( + f"{extrema_smoothing=}, {extrema_smoothing_zero_phase=}, {extrema_smoothing_beta=}, {window=}, {std=}" + ) gaussian_window = get_gaussian_window(std, True) odd_window = get_odd_window(window) smoothing_methods: dict[str, Series] = { - "gaussian": series.rolling( + "gaussian": zero_phase( + series=series, + window=window, + win_type="gaussian", + std=std, + beta=extrema_smoothing_beta, + ) + if extrema_smoothing_zero_phase + else series.rolling( window=gaussian_window, win_type="gaussian", center=True, ).mean(std=std), - "zero_phase_gaussian": zero_phase_gaussian( - series=series, window=window, std=std - ), - "boxcar": series.rolling( - window=odd_window, win_type="boxcar", center=True - ).mean(), - "triang": series.rolling( - window=odd_window, win_type="triang", center=True - ).mean(), + "kaiser": zero_phase( + series=series, + window=window, + win_type="kaiser", + std=std, + beta=extrema_smoothing_beta, + ) + if extrema_smoothing_zero_phase + else series.rolling( + window=odd_window, + win_type="kaiser", + center=True, + ).mean(beta=extrema_smoothing_beta), "smm": series.rolling(window=odd_window, center=True).median(), "sma": series.rolling(window=odd_window, center=True).mean(), "ewma": series.ewm(span=window).mean(), - "zlewma": zlema(series, period=window), } return smoothing_methods.get( extrema_smoothing, diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index f03547e..7ceef25 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -5,7 +5,7 @@ import numpy as np import pandas as pd import scipy as sp import talib.abstract as ta -from typing import Callable, Union +from typing import Callable, Literal, Union from technical import qtpylib @@ -45,18 +45,31 @@ def derive_gaussian_std_from_window(window: int) -> float: @lru_cache(maxsize=64) -def _calculate_gaussian_coeffs(window: int, std: float) -> np.ndarray: - gaussian_coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True) - return gaussian_coeffs / np.sum(gaussian_coeffs) +def _calculate_coeffs( + window: int, win_type: Literal["gaussian", "kaiser"], std: float, beta: float +) -> np.ndarray: + if win_type == "gaussian": + coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True) + elif win_type == "kaiser": + coeffs = sp.signal.windows.kaiser(M=window, beta=beta, sym=True) + else: + raise ValueError(f"Unknown window type: {win_type}") + return coeffs / np.sum(coeffs) -def zero_phase_gaussian(series: pd.Series, window: int, std: float) -> pd.Series: +def zero_phase( + series: pd.Series, + window: int, + win_type: Literal["gaussian", "kaiser"], + std: float, + beta: float, +) -> pd.Series: if len(series) == 0: return series if len(series) < window: raise ValueError("Series length must be greater than or equal to window size") series_values = series.to_numpy() - b = _calculate_gaussian_coeffs(window, std) + b = _calculate_coeffs(window=window, win_type=win_type, std=std, beta=beta) a = 1.0 filtered_values = sp.signal.filtfilt(b, a, series_values) return pd.Series(filtered_values, index=series.index) @@ -165,7 +178,7 @@ def zlema(series: pd.Series, period: int) -> pd.Series: lag = max((period - 1) / 2, 0) alpha = 2 / (period + 1) zl_series = 2 * series - series.shift(int(lag)) - return zl_series.ewm(alpha=alpha).mean() + return zl_series.ewm(alpha=alpha, adjust=False).mean() def _fractal_dimension(highs: np.ndarray, lows: np.ndarray, period: int) -> float: -- 2.43.0