]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
fix(qav3): untangle candidate pivot logic from confirmed pivot one
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 5 May 2025 16:48:01 +0000 (18:48 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 5 May 2025 16:48:01 +0000 (18:48 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index ed01c2b8617e04887a799172edd3c5d82e5e91e5..6fac905fd3912745723a7abf69bc3b0fb60f56a5 100644 (file)
@@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.28"
+    version = "3.7.29"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -850,10 +850,10 @@ def zigzag(
     df: pd.DataFrame,
     natr_period: int = 14,
     natr_ratio: float = 1.0,
-    confirmation_window: int = 3,
+    confirmation_window: int = 2,
     depth: int = 12,
 ) -> tuple[list[int], list[float], list[int]]:
-    if df.empty or len(df) < natr_period + confirmation_window:
+    if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1):
         return [], [], []
 
     indices = df.index.tolist()
@@ -865,8 +865,19 @@ def zigzag(
     lows = df["low"].values
 
     state: TrendDirection = TrendDirection.NEUTRAL
-    pivots_indices, pivots_values, pivots_directions = [], [], []
+
     last_pivot_pos = -depth - 1
+    pivots_indices, pivots_values, pivots_directions = [], [], []
+
+    candidate_pivot_pos = -1
+    candidate_pivot_value = np.nan
+    candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
+
+    def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
+        nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+        candidate_pivot_pos = pos
+        candidate_pivot_value = value
+        candidate_pivot_direction = direction
 
     def add_pivot(pos: int, value: float, direction: TrendDirection):
         nonlocal last_pivot_pos
@@ -876,21 +887,40 @@ def zigzag(
         pivots_values.append(value)
         pivots_directions.append(direction)
         last_pivot_pos = pos
-
-    def update_last_pivot(pos: int, value: float, direction: TrendDirection):
-        if pivots_indices and indices[pos] != pivots_indices[-1]:
-            pivots_indices[-1] = indices[pos]
-            pivots_values[-1] = value
-            pivots_directions[-1] = direction
+        update_candidate_pivot(
+            pos,
+            value,
+            TrendDirection.UP
+            if direction == TrendDirection.DOWN
+            else TrendDirection.DOWN,
+        )
 
     def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
-        if pos + confirmation_window >= len(df):
+        if pos - confirmation_window < 0 or pos + confirmation_window >= len(df):
             return False
-        next_closes = closes[pos + 1 : pos + confirmation_window + 1]
+        next_slice = slice(pos + 1, pos + confirmation_window + 1)
+        next_closes = closes[next_slice]
+        next_highs = highs[next_slice]
+        next_lows = lows[next_slice]
+        previous_slice = slice(pos - confirmation_window, pos)
+        previous_closes = closes[previous_slice]
+        previous_highs = highs[previous_slice]
+        previous_lows = lows[previous_slice]
+
         if direction == TrendDirection.DOWN:
-            return np.all(next_closes < highs[pos])
+            return (
+                np.all(next_closes < highs[pos])
+                and np.all(previous_closes < highs[pos])
+                and np.all(next_highs <= highs[pos])
+                and np.all(previous_highs <= highs[pos])
+            )
         elif direction == TrendDirection.UP:
-            return np.all(next_closes > lows[pos])
+            return (
+                np.all(next_closes > lows[pos])
+                and np.all(previous_closes > lows[pos])
+                and np.all(next_lows >= lows[pos])
+                and np.all(previous_lows >= lows[pos])
+            )
         return False
 
     start_pos = 0
@@ -899,27 +929,24 @@ def zigzag(
     initial_high = highs[initial_high_pos]
     initial_low = lows[initial_low_pos]
     for i in range(start_pos + 1, len(df)):
-        if highs[i] > initial_high:
-            initial_high, initial_high_pos = highs[i], i
-        if lows[i] < initial_low:
-            initial_low, initial_low_pos = lows[i], i
-        if (
-            i - initial_high_pos < confirmation_window
-            or i - initial_low_pos < confirmation_window
-        ):
-            continue
-
-        initial_move_from_high = (initial_high - lows[i]) / initial_high
-        initial_move_from_low = (highs[i] - initial_low) / initial_low
-        if initial_move_from_high >= thresholds[i] and is_reversal_confirmed(
-            initial_high_pos, TrendDirection.DOWN
-        ):
+        current_high = highs[i]
+        current_low = lows[i]
+        if current_high > initial_high:
+            initial_high, initial_high_pos = current_high, i
+        if current_low < initial_low:
+            initial_low, initial_low_pos = current_low, i
+
+        initial_move_from_high = (initial_high - current_low) / initial_high
+        initial_move_from_low = (current_high - initial_low) / initial_low
+        if initial_move_from_high >= thresholds[
+            initial_high_pos
+        ] and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN):
             add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
             state = TrendDirection.DOWN
             break
-        elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed(
-            initial_low_pos, TrendDirection.UP
-        ):
+        elif initial_move_from_low >= thresholds[
+            initial_low_pos
+        ] and is_reversal_confirmed(initial_low_pos, TrendDirection.UP):
             add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
             state = TrendDirection.UP
             break
@@ -929,26 +956,30 @@ def zigzag(
     for i in range(last_pivot_pos + 1, len(df)):
         current_high = highs[i]
         current_low = lows[i]
-        last_pivot_val = pivots_values[-1]
+
         if state == TrendDirection.UP:
-            if current_high > last_pivot_val:
-                update_last_pivot(i, current_high, TrendDirection.UP)
-            elif (
-                (last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
-                and (i - last_pivot_pos) > depth
-                and is_reversal_confirmed(i, TrendDirection.DOWN)
+            if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
+                update_candidate_pivot(i, current_high, TrendDirection.UP)
+            if (
+                (candidate_pivot_value - current_low) / candidate_pivot_value
+                >= thresholds[candidate_pivot_pos]
+                and (candidate_pivot_pos - last_pivot_pos) >= depth
+                and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.DOWN)
             ):
-                add_pivot(i, current_low, TrendDirection.DOWN)
+                add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
                 state = TrendDirection.DOWN
         elif state == TrendDirection.DOWN:
-            if current_low < last_pivot_val:
-                update_last_pivot(i, current_low, TrendDirection.DOWN)
-            elif (
-                (current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
-                and (i - last_pivot_pos) > depth
-                and is_reversal_confirmed(i, TrendDirection.UP)
+            if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
+                update_candidate_pivot(i, current_low, TrendDirection.DOWN)
+            if (
+                (current_high - candidate_pivot_value) / candidate_pivot_value
+                >= thresholds[candidate_pivot_pos]
+                and (candidate_pivot_pos - last_pivot_pos) >= depth
+                and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.UP)
             ):
-                add_pivot(i, current_high, TrendDirection.UP)
+                add_pivot(
+                    candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
+                )
                 state = TrendDirection.UP
 
     return pivots_indices, pivots_values, pivots_directions
index d2e8f8850418287455ac4748498d6a06eb328b43..07fadce5de526ba7f49ca7509c6769ff4f57c78c 100644 (file)
@@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.22"
+        return "3.3.23"
 
     timeframe = "5m"
 
@@ -643,12 +643,15 @@ class QuickAdapterV3(IStrategy):
         last_candle_natr = last_candle["natr_label_period_candles"]
         if isna(last_candle_natr):
             return False
-        entry_price_fluctuation = (
-            last_candle_close * last_candle_natr * self.get_entry_natr_ratio(pair)
-        )
+        entry_natr_ratio = self.get_entry_natr_ratio(pair)
         if side == "long":
-            lower_bound = last_candle_low - entry_price_fluctuation
-            upper_bound = last_candle_close + entry_price_fluctuation
+            lower_bound = (
+                last_candle_low - last_candle_low * last_candle_natr * entry_natr_ratio
+            )
+            upper_bound = (
+                last_candle_close
+                + last_candle_close * last_candle_natr * entry_natr_ratio
+            )
             if lower_bound < 0:
                 logger.info(
                     f"User denied {side} entry for {pair}: calculated lower bound {lower_bound} is below zero"
@@ -661,8 +664,14 @@ class QuickAdapterV3(IStrategy):
                     f"User denied {side} entry for {pair}: rate {rate} outside bounds [{lower_bound}, {upper_bound}]"
                 )
         elif side == "short":
-            lower_bound = last_candle_close - entry_price_fluctuation
-            upper_bound = last_candle_high + entry_price_fluctuation
+            lower_bound = (
+                last_candle_close
+                - last_candle_close * last_candle_natr * entry_natr_ratio
+            )
+            upper_bound = (
+                last_candle_high
+                + last_candle_high * last_candle_natr * entry_natr_ratio
+            )
             if lower_bound < 0:
                 logger.info(
                     f"User denied {side} entry for {pair}: calculated lower bound {lower_bound} is below zero"
index 3111957cd66c05bbf68b8b4b7346a1ce50239e7b..f0853f317932993c6dee93f5dcc57dc025b4dd53 100644 (file)
@@ -339,10 +339,10 @@ def zigzag(
     df: pd.DataFrame,
     natr_period: int = 14,
     natr_ratio: float = 1.0,
-    confirmation_window: int = 3,
+    confirmation_window: int = 2,
     depth: int = 12,
 ) -> tuple[list[int], list[float], list[int]]:
-    if df.empty or len(df) < natr_period + confirmation_window:
+    if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1):
         return [], [], []
 
     indices = df.index.tolist()
@@ -354,8 +354,19 @@ def zigzag(
     lows = df["low"].values
 
     state: TrendDirection = TrendDirection.NEUTRAL
-    pivots_indices, pivots_values, pivots_directions = [], [], []
+
     last_pivot_pos = -depth - 1
+    pivots_indices, pivots_values, pivots_directions = [], [], []
+
+    candidate_pivot_pos = -1
+    candidate_pivot_value = np.nan
+    candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL
+
+    def update_candidate_pivot(pos: int, value: float, direction: TrendDirection):
+        nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction
+        candidate_pivot_pos = pos
+        candidate_pivot_value = value
+        candidate_pivot_direction = direction
 
     def add_pivot(pos: int, value: float, direction: TrendDirection):
         nonlocal last_pivot_pos
@@ -365,21 +376,40 @@ def zigzag(
         pivots_values.append(value)
         pivots_directions.append(direction)
         last_pivot_pos = pos
-
-    def update_last_pivot(pos: int, value: float, direction: TrendDirection):
-        if pivots_indices and indices[pos] != pivots_indices[-1]:
-            pivots_indices[-1] = indices[pos]
-            pivots_values[-1] = value
-            pivots_directions[-1] = direction
+        update_candidate_pivot(
+            pos,
+            value,
+            TrendDirection.UP
+            if direction == TrendDirection.DOWN
+            else TrendDirection.DOWN,
+        )
 
     def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool:
-        if pos + confirmation_window >= len(df):
+        if pos - confirmation_window < 0 or pos + confirmation_window >= len(df):
             return False
-        next_closes = closes[pos + 1 : pos + confirmation_window + 1]
+        next_slice = slice(pos + 1, pos + confirmation_window + 1)
+        next_closes = closes[next_slice]
+        next_highs = highs[next_slice]
+        next_lows = lows[next_slice]
+        previous_slice = slice(pos - confirmation_window, pos)
+        previous_closes = closes[previous_slice]
+        previous_highs = highs[previous_slice]
+        previous_lows = lows[previous_slice]
+
         if direction == TrendDirection.DOWN:
-            return np.all(next_closes < highs[pos])
+            return (
+                np.all(next_closes < highs[pos])
+                and np.all(previous_closes < highs[pos])
+                and np.all(next_highs <= highs[pos])
+                and np.all(previous_highs <= highs[pos])
+            )
         elif direction == TrendDirection.UP:
-            return np.all(next_closes > lows[pos])
+            return (
+                np.all(next_closes > lows[pos])
+                and np.all(previous_closes > lows[pos])
+                and np.all(next_lows >= lows[pos])
+                and np.all(previous_lows >= lows[pos])
+            )
         return False
 
     start_pos = 0
@@ -388,27 +418,24 @@ def zigzag(
     initial_high = highs[initial_high_pos]
     initial_low = lows[initial_low_pos]
     for i in range(start_pos + 1, len(df)):
-        if highs[i] > initial_high:
-            initial_high, initial_high_pos = highs[i], i
-        if lows[i] < initial_low:
-            initial_low, initial_low_pos = lows[i], i
-        if (
-            i - initial_high_pos < confirmation_window
-            or i - initial_low_pos < confirmation_window
-        ):
-            continue
-
-        initial_move_from_high = (initial_high - lows[i]) / initial_high
-        initial_move_from_low = (highs[i] - initial_low) / initial_low
-        if initial_move_from_high >= thresholds[i] and is_reversal_confirmed(
-            initial_high_pos, TrendDirection.DOWN
-        ):
+        current_high = highs[i]
+        current_low = lows[i]
+        if current_high > initial_high:
+            initial_high, initial_high_pos = current_high, i
+        if current_low < initial_low:
+            initial_low, initial_low_pos = current_low, i
+
+        initial_move_from_high = (initial_high - current_low) / initial_high
+        initial_move_from_low = (current_high - initial_low) / initial_low
+        if initial_move_from_high >= thresholds[
+            initial_high_pos
+        ] and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN):
             add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
             state = TrendDirection.DOWN
             break
-        elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed(
-            initial_low_pos, TrendDirection.UP
-        ):
+        elif initial_move_from_low >= thresholds[
+            initial_low_pos
+        ] and is_reversal_confirmed(initial_low_pos, TrendDirection.UP):
             add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
             state = TrendDirection.UP
             break
@@ -418,26 +445,30 @@ def zigzag(
     for i in range(last_pivot_pos + 1, len(df)):
         current_high = highs[i]
         current_low = lows[i]
-        last_pivot_val = pivots_values[-1]
+
         if state == TrendDirection.UP:
-            if current_high > last_pivot_val:
-                update_last_pivot(i, current_high, TrendDirection.UP)
-            elif (
-                (last_pivot_val - current_low) / last_pivot_val >= thresholds[i]
-                and (i - last_pivot_pos) > depth
-                and is_reversal_confirmed(i, TrendDirection.DOWN)
+            if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
+                update_candidate_pivot(i, current_high, TrendDirection.UP)
+            if (
+                (candidate_pivot_value - current_low) / candidate_pivot_value
+                >= thresholds[candidate_pivot_pos]
+                and (candidate_pivot_pos - last_pivot_pos) >= depth
+                and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.DOWN)
             ):
-                add_pivot(i, current_low, TrendDirection.DOWN)
+                add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
                 state = TrendDirection.DOWN
         elif state == TrendDirection.DOWN:
-            if current_low < last_pivot_val:
-                update_last_pivot(i, current_low, TrendDirection.DOWN)
-            elif (
-                (current_high - last_pivot_val) / last_pivot_val >= thresholds[i]
-                and (i - last_pivot_pos) > depth
-                and is_reversal_confirmed(i, TrendDirection.UP)
+            if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
+                update_candidate_pivot(i, current_low, TrendDirection.DOWN)
+            if (
+                (current_high - candidate_pivot_value) / candidate_pivot_value
+                >= thresholds[candidate_pivot_pos]
+                and (candidate_pivot_pos - last_pivot_pos) >= depth
+                and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.UP)
             ):
-                add_pivot(i, current_high, TrendDirection.UP)
+                add_pivot(
+                    candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
+                )
                 state = TrendDirection.UP
 
     return pivots_indices, pivots_values, pivots_directions