]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(qav3): refine typing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 25 Jun 2025 14:11:49 +0000 (16:11 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 25 Jun 2025 14:11:49 +0000 (16:11 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 5b135aa7bf4b68c6f0a73ee0ba1b02538b2716bf..ec83444f7b810b09afada763ed4be9f1cb48f843 100644 (file)
@@ -1103,7 +1103,7 @@ def fit_regressor(
 def calculate_min_extrema(
     size: int, fit_live_predictions_candles: int, min_extrema: int = 4
 ) -> int:
-    return int(round((size / fit_live_predictions_candles) * min_extrema))
+    return int(round(size / fit_live_predictions_candles) * min_extrema)
 
 
 def train_objective(
@@ -1123,8 +1123,9 @@ def train_objective(
     test_ok = True
     test_length = len(X_test)
     if debug:
-        n_test_minima: int = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
-        n_test_maxima: int = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
+        test_extrema = y_test.get(EXTREMA_COLUMN)
+        n_test_minima: int = sp.signal.find_peaks(-test_extrema)[0].size
+        n_test_maxima: int = sp.signal.find_peaks(test_extrema)[0].size
         n_test_extrema: int = n_test_minima + n_test_maxima
         min_test_extrema: int = calculate_min_extrema(
             test_length, fit_live_predictions_candles
@@ -1142,8 +1143,9 @@ def train_objective(
     )
     X_test = X_test.iloc[-test_window:]
     y_test = y_test.iloc[-test_window:]
-    n_test_minima = sp.signal.find_peaks(-y_test[EXTREMA_COLUMN])[0].size
-    n_test_maxima = sp.signal.find_peaks(y_test[EXTREMA_COLUMN])[0].size
+    test_extrema = y_test.get(EXTREMA_COLUMN)
+    n_test_minima = sp.signal.find_peaks(-test_extrema)[0].size
+    n_test_maxima = sp.signal.find_peaks(test_extrema)[0].size
     n_test_extrema = n_test_minima + n_test_maxima
     min_test_extrema: int = calculate_min_extrema(
         test_window, fit_live_predictions_candles
@@ -1159,8 +1161,9 @@ def train_objective(
     train_ok = True
     train_length = len(X)
     if debug:
-        n_train_minima: int = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
-        n_train_maxima: int = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
+        train_extrema = y.get(EXTREMA_COLUMN)
+        n_train_minima: int = sp.signal.find_peaks(-train_extrema)[0].size
+        n_train_maxima: int = sp.signal.find_peaks(train_extrema)[0].size
         n_train_extrema: int = n_train_minima + n_train_maxima
         min_train_extrema: int = calculate_min_extrema(
             train_length, fit_live_predictions_candles
@@ -1178,8 +1181,9 @@ def train_objective(
     )
     X = X.iloc[-train_window:]
     y = y.iloc[-train_window:]
-    n_train_minima = sp.signal.find_peaks(-y[EXTREMA_COLUMN])[0].size
-    n_train_maxima = sp.signal.find_peaks(y[EXTREMA_COLUMN])[0].size
+    train_extrema = y.get(EXTREMA_COLUMN)
+    n_train_minima = sp.signal.find_peaks(-train_extrema)[0].size
+    n_train_maxima = sp.signal.find_peaks(train_extrema)[0].size
     n_train_extrema = n_train_minima + n_train_maxima
     min_train_extrema: int = calculate_min_extrema(
         train_window, fit_live_predictions_candles
@@ -1393,15 +1397,15 @@ def zigzag(
     df: pd.DataFrame,
     natr_period: int = 14,
     natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[int], list[float]]:
+) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
     n = len(df)
     if df.empty or n < natr_period:
         return [], [], [], []
 
     natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
 
-    indices = df.index.tolist()
-    thresholds = natr_values * natr_ratio
+    indices: list[int] = df.index.tolist()
+    thresholds: np.ndarray = natr_values * natr_ratio
     closes = df.get("close").to_numpy()
     highs = df.get("high").to_numpy()
     lows = df.get("low").to_numpy()
@@ -1410,7 +1414,7 @@ def zigzag(
 
     pivots_indices: list[int] = []
     pivots_values: list[float] = []
-    pivots_directions: list[int] = []
+    pivots_directions: list[TrendDirection] = []
     pivots_scaled_natrs: list[float] = []
     last_pivot_pos: int = -1
 
@@ -1465,7 +1469,7 @@ def zigzag(
         last_pivot_pos = pos
         reset_candidate_pivot()
 
-    slope_ok_cache: dict[tuple[int, int, int, float], bool] = {}
+    slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
 
     def get_slope_ok(
         pos: int,
@@ -1476,7 +1480,7 @@ def zigzag(
         cache_key = (
             pos,
             candidate_pivot_pos,
-            direction.value,
+            direction,
             min_slope,
         )
 
@@ -1510,7 +1514,7 @@ def zigzag(
         min_slope: float = np.finfo(float).eps,
         alpha: float = 0.05,
     ) -> bool:
-        start_pos = candidate_pivot_pos + 1
+        start_pos = min(candidate_pivot_pos + 1, n)
         end_pos = min(pos + 1, n)
         n_slopes = max(0, end_pos - start_pos)
 
index 130e46fc9f61625541dce1655478941dd039bb02..07a6bd57a5e2a08aa11305d34af1e68855bc9425 100644 (file)
@@ -17,6 +17,7 @@ import pandas_ta as pta
 import scipy as sp
 
 from Utils import (
+    TrendDirection,
     alligator,
     bottom_change_percent,
     calculate_quantile,
@@ -440,8 +441,12 @@ class QuickAdapterV3(IStrategy):
         else:
             for pivot_idx, pivot_dir in zip(pivots_indices, pivots_directions):
                 dataframe.at[pivot_idx, EXTREMA_COLUMN] = pivot_dir
-            dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0)
-            dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0)
+            dataframe["minima"] = np.where(
+                dataframe[EXTREMA_COLUMN] == TrendDirection.DOWN, -1, 0
+            )
+            dataframe["maxima"] = np.where(
+                dataframe[EXTREMA_COLUMN] == TrendDirection.UP, 1, 0
+            )
             logger.info(
                 f"{pair}: labeled {len(pivots_indices)} extrema (label_period={QuickAdapterV3.td_format(label_period)} / {label_period_candles=} / {label_natr_ratio=:.2f})"
             )
index 42f73e1779a72adc12658af8373ecb108aec53ed..11fcc31d403dd234d11bfd4c15a65a0dafcaab67 100644 (file)
@@ -404,15 +404,15 @@ def zigzag(
     df: pd.DataFrame,
     natr_period: int = 14,
     natr_ratio: float = 6.0,
-) -> tuple[list[int], list[float], list[int], list[float]]:
+) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
     n = len(df)
     if df.empty or n < natr_period:
         return [], [], [], []
 
     natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
 
-    indices = df.index.tolist()
-    thresholds = natr_values * natr_ratio
+    indices: list[int] = df.index.tolist()
+    thresholds: np.ndarray = natr_values * natr_ratio
     closes = df.get("close").to_numpy()
     highs = df.get("high").to_numpy()
     lows = df.get("low").to_numpy()
@@ -421,7 +421,7 @@ def zigzag(
 
     pivots_indices: list[int] = []
     pivots_values: list[float] = []
-    pivots_directions: list[int] = []
+    pivots_directions: list[TrendDirection] = []
     pivots_scaled_natrs: list[float] = []
     last_pivot_pos: int = -1
 
@@ -476,7 +476,7 @@ def zigzag(
         last_pivot_pos = pos
         reset_candidate_pivot()
 
-    slope_ok_cache: dict[tuple[int, int, int, float], bool] = {}
+    slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
 
     def get_slope_ok(
         pos: int,
@@ -487,7 +487,7 @@ def zigzag(
         cache_key = (
             pos,
             candidate_pivot_pos,
-            direction.value,
+            direction,
             min_slope,
         )
 
@@ -521,7 +521,7 @@ def zigzag(
         min_slope: float = np.finfo(float).eps,
         alpha: float = 0.05,
     ) -> bool:
-        start_pos = candidate_pivot_pos + 1
+        start_pos = min(candidate_pivot_pos + 1, n)
         end_pos = min(pos + 1, n)
         n_slopes = max(0, end_pos - start_pos)