From: Jérôme Benoit Date: Wed, 25 Jun 2025 14:11:49 +0000 (+0200) Subject: refactor(qav3): refine typing X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=d104d545cd7e222454581bf268842fcbdb4fef37;p=freqai-strategies.git refactor(qav3): refine typing Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 5b135aa..ec83444 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -1103,7 +1103,7 @@ def fit_regressor( def calculate_min_extrema( size: int, fit_live_predictions_candles: int, min_extrema: int = 4 ) -> int: - return int(round((size / fit_live_predictions_candles) * min_extrema)) + return int(round(size / fit_live_predictions_candles) * min_extrema) def train_objective( @@ -1123,8 +1123,9 @@ def train_objective( test_ok = True test_length = len(X_test) if debug: - n_test_minima: int = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size - n_test_maxima: int = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size + test_extrema = y_test.get(EXTREMA_COLUMN) + n_test_minima: int = sp.signal.find_peaks(-test_extrema)[0].size + n_test_maxima: int = sp.signal.find_peaks(test_extrema)[0].size n_test_extrema: int = n_test_minima + n_test_maxima min_test_extrema: int = calculate_min_extrema( test_length, fit_live_predictions_candles @@ -1142,8 +1143,9 @@ def train_objective( ) X_test = X_test.iloc[-test_window:] y_test = y_test.iloc[-test_window:] - n_test_minima = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size - n_test_maxima = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size + test_extrema = y_test.get(EXTREMA_COLUMN) + n_test_minima = sp.signal.find_peaks(-test_extrema)[0].size + n_test_maxima = sp.signal.find_peaks(test_extrema)[0].size n_test_extrema = n_test_minima + n_test_maxima min_test_extrema: int = calculate_min_extrema( test_window, fit_live_predictions_candles @@ -1159,8 +1161,9 @@ def train_objective( train_ok = True train_length = len(X) if debug: - n_train_minima: int = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size - n_train_maxima: int = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size + train_extrema = y.get(EXTREMA_COLUMN) + n_train_minima: int = sp.signal.find_peaks(-train_extrema)[0].size + n_train_maxima: int = sp.signal.find_peaks(train_extrema)[0].size n_train_extrema: int = n_train_minima + n_train_maxima min_train_extrema: int = calculate_min_extrema( train_length, fit_live_predictions_candles @@ -1178,8 +1181,9 @@ def train_objective( ) X = X.iloc[-train_window:] y = y.iloc[-train_window:] - n_train_minima = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size - n_train_maxima = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size + train_extrema = y.get(EXTREMA_COLUMN) + n_train_minima = sp.signal.find_peaks(-train_extrema)[0].size + n_train_maxima = sp.signal.find_peaks(train_extrema)[0].size n_train_extrema = n_train_minima + n_train_maxima min_train_extrema: int = calculate_min_extrema( train_window, fit_live_predictions_candles @@ -1393,15 +1397,15 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 6.0, -) -> tuple[list[int], list[float], list[int], list[float]]: +) -> tuple[list[int], list[float], list[TrendDirection], list[float]]: n = len(df) if df.empty or n < natr_period: return [], [], [], [] natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy() - indices = df.index.tolist() - thresholds = natr_values * natr_ratio + indices: list[int] = df.index.tolist() + thresholds: np.ndarray = natr_values * natr_ratio closes = df.get("close").to_numpy() highs = df.get("high").to_numpy() lows = df.get("low").to_numpy() @@ -1410,7 +1414,7 @@ def zigzag( pivots_indices: list[int] = [] pivots_values: list[float] = [] - pivots_directions: list[int] = [] + pivots_directions: list[TrendDirection] = [] pivots_scaled_natrs: list[float] = [] last_pivot_pos: int = -1 @@ -1465,7 +1469,7 @@ def zigzag( last_pivot_pos = pos reset_candidate_pivot() - slope_ok_cache: dict[tuple[int, int, int, float], bool] = {} + slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {} def get_slope_ok( pos: int, @@ -1476,7 +1480,7 @@ def zigzag( cache_key = ( pos, candidate_pivot_pos, - direction.value, + direction, min_slope, ) @@ -1510,7 +1514,7 @@ def zigzag( min_slope: float = np.finfo(float).eps, alpha: float = 0.05, ) -> bool: - start_pos = candidate_pivot_pos + 1 + start_pos = min(candidate_pivot_pos + 1, n) end_pos = min(pos + 1, n) n_slopes = max(0, end_pos - start_pos) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 130e46f..07a6bd5 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -17,6 +17,7 @@ import pandas_ta as pta import scipy as sp from Utils import ( + TrendDirection, alligator, bottom_change_percent, calculate_quantile, @@ -440,8 +441,12 @@ class QuickAdapterV3(IStrategy): else: for pivot_idx, pivot_dir in zip(pivots_indices, pivots_directions): dataframe.at[pivot_idx, EXTREMA_COLUMN] = pivot_dir - dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0) - dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0) + dataframe["minima"] = np.where( + dataframe[EXTREMA_COLUMN] == TrendDirection.DOWN, -1, 0 + ) + dataframe["maxima"] = np.where( + dataframe[EXTREMA_COLUMN] == TrendDirection.UP, 1, 0 + ) logger.info( f"{pair}: labeled {len(pivots_indices)} extrema (label_period={QuickAdapterV3.td_format(label_period)} / {label_period_candles=} / {label_natr_ratio=:.2f})" ) diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 42f73e1..11fcc31 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -404,15 +404,15 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 6.0, -) -> tuple[list[int], list[float], list[int], list[float]]: +) -> tuple[list[int], list[float], list[TrendDirection], list[float]]: n = len(df) if df.empty or n < natr_period: return [], [], [], [] natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy() - indices = df.index.tolist() - thresholds = natr_values * natr_ratio + indices: list[int] = df.index.tolist() + thresholds: np.ndarray = natr_values * natr_ratio closes = df.get("close").to_numpy() highs = df.get("high").to_numpy() lows = df.get("low").to_numpy() @@ -421,7 +421,7 @@ def zigzag( pivots_indices: list[int] = [] pivots_values: list[float] = [] - pivots_directions: list[int] = [] + pivots_directions: list[TrendDirection] = [] pivots_scaled_natrs: list[float] = [] last_pivot_pos: int = -1 @@ -476,7 +476,7 @@ def zigzag( last_pivot_pos = pos reset_candidate_pivot() - slope_ok_cache: dict[tuple[int, int, int, float], bool] = {} + slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {} def get_slope_ok( pos: int, @@ -487,7 +487,7 @@ def zigzag( cache_key = ( pos, candidate_pivot_pos, - direction.value, + direction, min_slope, ) @@ -521,7 +521,7 @@ def zigzag( min_slope: float = np.finfo(float).eps, alpha: float = 0.05, ) -> bool: - start_pos = candidate_pivot_pos + 1 + start_pos = min(candidate_pivot_pos + 1, n) end_pos = min(pos + 1, n) n_slopes = max(0, end_pos - start_pos)