]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
feat(qav3): add fractals filtering to reversal pivots labeling
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 1 May 2025 20:33:32 +0000 (22:33 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 1 May 2025 20:33:32 +0000 (22:33 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 3c3bc52e34304f6fa0a12459f89e7a601f9507ea..5919ae3877cf5db751c43baa6f026de7c8b1c43a 100644 (file)
@@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.22"
+    version = "3.7.23"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -846,17 +846,48 @@ class TrendDirection(IntEnum):
     DOWN = -1
 
 
+def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
+    highs = df["high"].values
+    lows = df["low"].values
+    fractal_highs = []
+    fractal_lows = []
+    for i in range(fractal_period, max(fractal_period, len(df) - fractal_period)):
+        valid_high = True
+        valid_low = True
+        for j in range(1, fractal_period + 1):
+            if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
+                valid_high = False
+            if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
+                valid_low = False
+            if not valid_high and not valid_low:
+                break
+        if valid_high:
+            fractal_highs.append(i)
+        if valid_low:
+            fractal_lows.append(i)
+    return fractal_highs, fractal_lows
+
+
 def zigzag(
     df: pd.DataFrame,
-    period: int = 14,
-    ratio: float = 1.0,
+    natr_period: int = 14,
+    natr_ratio: float = 1.0,
+    fractal_period: int = 2,
     depth: int = 12,
 ) -> tuple[list[int], list[float], list[int]]:
     if df.empty or len(df) < 2:
         return [], [], []
 
+    fractal_highs, fractal_lows = find_fractals(df, fractal_period)
+    fractal_high_set = set(fractal_highs)
+    fractal_low_set = set(fractal_lows)
+    is_fractal_high = [i in fractal_high_set for i in range(len(df))]
+    is_fractal_low = [i in fractal_low_set for i in range(len(df))]
+
     indices = df.index.tolist()
-    thresholds = (ta.NATR(df, timeperiod=period) * ratio).fillna(method="bfill").values
+    thresholds = (
+        (ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values
+    )
     highs = df["high"].values
     lows = df["low"].values
 
@@ -877,11 +908,12 @@ def zigzag(
             pivots_values[-1] = value
             pivots_directions[-1] = direction
 
-    initial_high_pos = 0
-    initial_low_pos = 0
+    start_pos = fractal_period
+    initial_high_pos = start_pos
+    initial_low_pos = start_pos
     initial_high = highs[initial_high_pos]
     initial_low = lows[initial_low_pos]
-    for i in range(1, len(df)):
+    for i in range(start_pos + 1, len(df)):
         if highs[i] > initial_high:
             initial_high, initial_high_pos = highs[i], i
         if lows[i] < initial_low:
@@ -889,11 +921,14 @@ def zigzag(
 
         initial_move_from_high = (initial_high - lows[i]) / initial_high
         initial_move_from_low = (highs[i] - initial_low) / initial_low
-        if initial_move_from_high >= thresholds[i]:
+        if (
+            initial_move_from_high >= thresholds[i]
+            and is_fractal_high[initial_high_pos]
+        ):
             add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
             state = TrendDirection.DOWN
             break
-        elif initial_move_from_low >= thresholds[i]:
+        elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]:
             add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
             state = TrendDirection.UP
             break
@@ -903,40 +938,48 @@ def zigzag(
     for i in range(i + 1, len(df)):
         last_pivot_val = pivots_values[-1]
         if state == TrendDirection.UP:
-            if highs[i] > last_pivot_val:
+            if highs[i] > last_pivot_val and is_fractal_high[i]:
                 update_last_pivot(i, highs[i], TrendDirection.UP)
-            elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and (
-                i - last_pivot_pos
-            ) >= depth:
+            elif (
+                (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i]
+                and (i - last_pivot_pos) >= depth
+                and is_fractal_low[i]
+            ):
                 add_pivot(i, lows[i], TrendDirection.DOWN)
                 state = TrendDirection.DOWN
         elif state == TrendDirection.DOWN:
-            if lows[i] < last_pivot_val:
+            if lows[i] < last_pivot_val and is_fractal_low[i]:
                 update_last_pivot(i, lows[i], TrendDirection.DOWN)
-            elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and (
-                i - last_pivot_pos
-            ) >= depth:
+            elif (
+                (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i]
+                and (i - last_pivot_pos) >= depth
+                and is_fractal_high[i]
+            ):
                 add_pivot(i, highs[i], TrendDirection.UP)
                 state = TrendDirection.UP
 
     final_pos = len(df) - 1
-    if state != TrendDirection.NEUTRAL and (final_pos - last_pivot_pos) >= depth:
-        last_pivot_val = pivots_values[-1]
-        price_move = (
-            (highs[final_pos] - last_pivot_val) / last_pivot_val
-            if state == TrendDirection.UP
-            else (last_pivot_val - lows[final_pos]) / last_pivot_val
+    last_pivot_val = pivots_values[-1]
+    final_price_move = (
+        (highs[final_pos] - last_pivot_val) / last_pivot_val
+        if state == TrendDirection.UP
+        else (last_pivot_val - lows[final_pos]) / last_pivot_val
+    )
+    if (
+        state != TrendDirection.NEUTRAL
+        and (final_pos - last_pivot_pos) >= depth
+        and final_price_move >= thresholds[final_pos]
+        and indices[final_pos] != pivots_indices[-1]
+        and (
+            (state == TrendDirection.UP and is_fractal_high[final_pos])
+            or (state == TrendDirection.DOWN and is_fractal_low[final_pos])
+        )
+    ):
+        add_pivot(
+            final_pos,
+            highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
+            state,
         )
-
-        if (
-            price_move >= thresholds[final_pos]
-            and indices[final_pos] != pivots_indices[-1]
-        ):
-            add_pivot(
-                final_pos,
-                highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
-                state,
-            )
 
     return pivots_indices, pivots_values, pivots_directions
 
@@ -974,8 +1017,8 @@ def label_objective(
 
     _, pivots_values, _ = zigzag(
         df,
-        period=label_period_candles,
-        ratio=label_natr_ratio,
+        natr_period=label_period_candles,
+        natr_ratio=label_natr_ratio,
     )
 
     if len(pivots_values) < 2:
index 07ddaae17dfd965ded5098a42baddbad1ba3aff4..9caab9de98a629b0265b3de300255c78c3ab8f6b 100644 (file)
@@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.16"
+        return "3.3.17"
 
     timeframe = "5m"
 
@@ -378,8 +378,8 @@ class QuickAdapterV3(IStrategy):
         pair = str(metadata.get("pair"))
         pivots_indices, _, pivots_directions = zigzag(
             dataframe,
-            period=self.get_label_period_candles(pair),
-            ratio=self.get_label_natr_ratio(pair),
+            natr_period=self.get_label_period_candles(pair),
+            natr_ratio=self.get_label_natr_ratio(pair),
         )
         dataframe[EXTREMA_COLUMN] = 0
         for pivot_idx, pivot_dir in zip(pivots_indices, pivots_directions):
index 2e56cccaf5e309f92c7807cd70c3b97725969224..d4fafe8acf3fb9d83f77e8eafe7b81bea70e5fcf 100644 (file)
@@ -305,17 +305,48 @@ class TrendDirection(IntEnum):
     DOWN = -1
 
 
+def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]:
+    highs = df["high"].values
+    lows = df["low"].values
+    fractal_highs = []
+    fractal_lows = []
+    for i in range(fractal_period, max(fractal_period, len(df) - fractal_period)):
+        valid_high = True
+        valid_low = True
+        for j in range(1, fractal_period + 1):
+            if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]:
+                valid_high = False
+            if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]:
+                valid_low = False
+            if not valid_high and not valid_low:
+                break
+        if valid_high:
+            fractal_highs.append(i)
+        if valid_low:
+            fractal_lows.append(i)
+    return fractal_highs, fractal_lows
+
+
 def zigzag(
     df: pd.DataFrame,
-    period: int = 14,
-    ratio: float = 1.0,
+    natr_period: int = 14,
+    natr_ratio: float = 1.0,
+    fractal_period: int = 2,
     depth: int = 12,
 ) -> tuple[list[int], list[float], list[int]]:
     if df.empty or len(df) < 2:
         return [], [], []
 
+    fractal_highs, fractal_lows = find_fractals(df, fractal_period)
+    fractal_high_set = set(fractal_highs)
+    fractal_low_set = set(fractal_lows)
+    is_fractal_high = [i in fractal_high_set for i in range(len(df))]
+    is_fractal_low = [i in fractal_low_set for i in range(len(df))]
+
     indices = df.index.tolist()
-    thresholds = (ta.NATR(df, timeperiod=period) * ratio).fillna(method="bfill").values
+    thresholds = (
+        (ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values
+    )
     highs = df["high"].values
     lows = df["low"].values
 
@@ -336,11 +367,12 @@ def zigzag(
             pivots_values[-1] = value
             pivots_directions[-1] = direction
 
-    initial_high_pos = 0
-    initial_low_pos = 0
+    start_pos = fractal_period
+    initial_high_pos = start_pos
+    initial_low_pos = start_pos
     initial_high = highs[initial_high_pos]
     initial_low = lows[initial_low_pos]
-    for i in range(1, len(df)):
+    for i in range(start_pos + 1, len(df)):
         if highs[i] > initial_high:
             initial_high, initial_high_pos = highs[i], i
         if lows[i] < initial_low:
@@ -348,11 +380,14 @@ def zigzag(
 
         initial_move_from_high = (initial_high - lows[i]) / initial_high
         initial_move_from_low = (highs[i] - initial_low) / initial_low
-        if initial_move_from_high >= thresholds[i]:
+        if (
+            initial_move_from_high >= thresholds[i]
+            and is_fractal_high[initial_high_pos]
+        ):
             add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
             state = TrendDirection.DOWN
             break
-        elif initial_move_from_low >= thresholds[i]:
+        elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]:
             add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
             state = TrendDirection.UP
             break
@@ -362,39 +397,47 @@ def zigzag(
     for i in range(i + 1, len(df)):
         last_pivot_val = pivots_values[-1]
         if state == TrendDirection.UP:
-            if highs[i] > last_pivot_val:
+            if highs[i] > last_pivot_val and is_fractal_high[i]:
                 update_last_pivot(i, highs[i], TrendDirection.UP)
-            elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and (
-                i - last_pivot_pos
-            ) >= depth:
+            elif (
+                (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i]
+                and (i - last_pivot_pos) >= depth
+                and is_fractal_low[i]
+            ):
                 add_pivot(i, lows[i], TrendDirection.DOWN)
                 state = TrendDirection.DOWN
         elif state == TrendDirection.DOWN:
-            if lows[i] < last_pivot_val:
+            if lows[i] < last_pivot_val and is_fractal_low[i]:
                 update_last_pivot(i, lows[i], TrendDirection.DOWN)
-            elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and (
-                i - last_pivot_pos
-            ) >= depth:
+            elif (
+                (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i]
+                and (i - last_pivot_pos) >= depth
+                and is_fractal_high[i]
+            ):
                 add_pivot(i, highs[i], TrendDirection.UP)
                 state = TrendDirection.UP
 
     final_pos = len(df) - 1
-    if state != TrendDirection.NEUTRAL and (final_pos - last_pivot_pos) >= depth:
-        last_pivot_val = pivots_values[-1]
-        price_move = (
-            (highs[final_pos] - last_pivot_val) / last_pivot_val
-            if state == TrendDirection.UP
-            else (last_pivot_val - lows[final_pos]) / last_pivot_val
+    last_pivot_val = pivots_values[-1]
+    final_price_move = (
+        (highs[final_pos] - last_pivot_val) / last_pivot_val
+        if state == TrendDirection.UP
+        else (last_pivot_val - lows[final_pos]) / last_pivot_val
+    )
+    if (
+        state != TrendDirection.NEUTRAL
+        and (final_pos - last_pivot_pos) >= depth
+        and final_price_move >= thresholds[final_pos]
+        and indices[final_pos] != pivots_indices[-1]
+        and (
+            (state == TrendDirection.UP and is_fractal_high[final_pos])
+            or (state == TrendDirection.DOWN and is_fractal_low[final_pos])
+        )
+    ):
+        add_pivot(
+            final_pos,
+            highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
+            state,
         )
-
-        if (
-            price_move >= thresholds[final_pos]
-            and indices[final_pos] != pivots_indices[-1]
-        ):
-            add_pivot(
-                final_pos,
-                highs[final_pos] if state == TrendDirection.UP else lows[final_pos],
-                state,
-            )
 
     return pivots_indices, pivots_values, pivots_directions