]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(qav3): capture labeled pivot reversal strength
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 16 May 2025 23:28:54 +0000 (01:28 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 16 May 2025 23:28:54 +0000 (01:28 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index aff8b9a35407e8ab7daae0c15e179c795862fd9e..bd527431db95c1f011bcb41f852e5f7d8dffd655 100644 (file)
@@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.33"
+    version = "3.7.34"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -900,11 +900,12 @@ def zigzag(
         candidate_pivot_pos: int,
         next_confirmation_pos: int,
         direction: TrendDirection,
+        extrema_threshold: float = 0.95,
+        min_slope_strength: float = 0.5,
         move_away_ratio: float = 0.25,
-        min_price_change_ratio: float = 0.125,
     ) -> bool:
         next_start = next_confirmation_pos + 1
-        next_end = min(next_confirmation_pos + confirmation_window + 1, n)
+        next_end = min(next_start + confirmation_window, n)
         previous_start = max(candidate_pivot_pos - confirmation_window, 0)
         previous_end = candidate_pivot_pos
         if next_start >= next_end or previous_start >= previous_end:
@@ -916,34 +917,43 @@ def zigzag(
         next_lows = lows[next_slice]
         previous_slice = slice(previous_start, previous_end)
         previous_closes = closes[previous_slice]
-        previous_highs = highs[previous_slice]
-        previous_lows = lows[previous_slice]
 
         local_extrema_ok = False
         if direction == TrendDirection.DOWN:
-            if (
-                np.all(next_closes < highs[candidate_pivot_pos])
-                and np.all(previous_closes < highs[candidate_pivot_pos])
-                and np.max(next_highs) <= highs[candidate_pivot_pos]
-                and np.max(previous_highs) <= highs[candidate_pivot_pos]
-            ):
-                local_extrema_ok = True
+            valid_next = (
+                np.sum(next_closes < highs[candidate_pivot_pos]) / len(next_closes)
+                >= extrema_threshold
+            )
+            valid_previous = (
+                np.sum(previous_closes < highs[candidate_pivot_pos])
+                / len(previous_closes)
+                >= extrema_threshold
+            )
+            local_extrema_ok = valid_next and valid_previous
         elif direction == TrendDirection.UP:
-            if (
-                np.all(next_closes > lows[candidate_pivot_pos])
-                and np.all(previous_closes > lows[candidate_pivot_pos])
-                and np.min(next_lows) >= lows[candidate_pivot_pos]
-                and np.min(previous_lows) >= lows[candidate_pivot_pos]
-            ):
-                local_extrema_ok = True
-
-        slope_ok = True
-        if len(next_closes) >= 2:
-            next_slope = (next_closes[-1] - next_closes[0]) / (len(next_closes) - 1)
+            valid_next = (
+                np.sum(next_closes > lows[candidate_pivot_pos]) / len(next_closes)
+                >= extrema_threshold
+            )
+            valid_previous = (
+                np.sum(previous_closes > lows[candidate_pivot_pos])
+                / len(previous_closes)
+                >= extrema_threshold
+            )
+            local_extrema_ok = valid_next and valid_previous
+
+        slope_ok = False
+        next_closes_std = np.std(next_closes)
+        if len(next_closes) >= 2 and next_closes_std > np.finfo(float).eps:
+            weights = np.linspace(0.5, 1.5, len(next_closes))
+            next_slope = np.polyfit(range(len(next_closes)), next_closes, 1, w=weights)[
+                0
+            ]
+            next_slope_strength = next_slope / next_closes_std
             if direction == TrendDirection.DOWN:
-                slope_ok = next_slope < 0
+                slope_ok = next_slope_strength < -min_slope_strength
             elif direction == TrendDirection.UP:
-                slope_ok = next_slope > 0
+                slope_ok = next_slope_strength > min_slope_strength
 
         significant_move_away_ok = False
         if direction == TrendDirection.DOWN:
@@ -961,23 +971,7 @@ def zigzag(
             ):
                 significant_move_away_ok = True
 
-        min_price_change_ok = False
-        required_price_change = (
-            thresholds[next_confirmation_pos]
-            * min_price_change_ratio
-            * closes[next_confirmation_pos]
-        )
-        if len(next_closes) > 0:
-            actual_price_change = abs(next_closes[-1] - next_closes[0])
-            if actual_price_change >= required_price_change:
-                min_price_change_ok = True
-
-        return (
-            local_extrema_ok
-            and slope_ok
-            and significant_move_away_ok
-            and min_price_change_ok
-        )
+        return local_extrema_ok and slope_ok and significant_move_away_ok
 
     start_pos = 0
     initial_high_pos = start_pos
index 019348454be063b91d76f3325d7ffc8d2e5b24b3..f6f458dbf172c901fd319d9449806e61507e5559 100644 (file)
@@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.29"
+        return "3.3.30"
 
     timeframe = "5m"
 
index 7cc7ea36e0b9150c1d97bd39d44c36ce123e0d58..6c11c8128c897fd6bb4a7dab9252daa3cfc6416b 100644 (file)
@@ -396,11 +396,12 @@ def zigzag(
         candidate_pivot_pos: int,
         next_confirmation_pos: int,
         direction: TrendDirection,
+        extrema_threshold: float = 0.95,
+        min_slope_strength: float = 0.5,
         move_away_ratio: float = 0.25,
-        min_price_change_ratio: float = 0.125,
     ) -> bool:
         next_start = next_confirmation_pos + 1
-        next_end = min(next_confirmation_pos + confirmation_window + 1, n)
+        next_end = min(next_start + confirmation_window, n)
         previous_start = max(candidate_pivot_pos - confirmation_window, 0)
         previous_end = candidate_pivot_pos
         if next_start >= next_end or previous_start >= previous_end:
@@ -412,34 +413,43 @@ def zigzag(
         next_lows = lows[next_slice]
         previous_slice = slice(previous_start, previous_end)
         previous_closes = closes[previous_slice]
-        previous_highs = highs[previous_slice]
-        previous_lows = lows[previous_slice]
 
         local_extrema_ok = False
         if direction == TrendDirection.DOWN:
-            if (
-                np.all(next_closes < highs[candidate_pivot_pos])
-                and np.all(previous_closes < highs[candidate_pivot_pos])
-                and np.max(next_highs) <= highs[candidate_pivot_pos]
-                and np.max(previous_highs) <= highs[candidate_pivot_pos]
-            ):
-                local_extrema_ok = True
+            valid_next = (
+                np.sum(next_closes < highs[candidate_pivot_pos]) / len(next_closes)
+                >= extrema_threshold
+            )
+            valid_previous = (
+                np.sum(previous_closes < highs[candidate_pivot_pos])
+                / len(previous_closes)
+                >= extrema_threshold
+            )
+            local_extrema_ok = valid_next and valid_previous
         elif direction == TrendDirection.UP:
-            if (
-                np.all(next_closes > lows[candidate_pivot_pos])
-                and np.all(previous_closes > lows[candidate_pivot_pos])
-                and np.min(next_lows) >= lows[candidate_pivot_pos]
-                and np.min(previous_lows) >= lows[candidate_pivot_pos]
-            ):
-                local_extrema_ok = True
-
-        slope_ok = True
-        if len(next_closes) >= 2:
-            next_slope = (next_closes[-1] - next_closes[0]) / (len(next_closes) - 1)
+            valid_next = (
+                np.sum(next_closes > lows[candidate_pivot_pos]) / len(next_closes)
+                >= extrema_threshold
+            )
+            valid_previous = (
+                np.sum(previous_closes > lows[candidate_pivot_pos])
+                / len(previous_closes)
+                >= extrema_threshold
+            )
+            local_extrema_ok = valid_next and valid_previous
+
+        slope_ok = False
+        next_closes_std = np.std(next_closes)
+        if len(next_closes) >= 2 and next_closes_std > np.finfo(float).eps:
+            weights = np.linspace(0.5, 1.5, len(next_closes))
+            next_slope = np.polyfit(range(len(next_closes)), next_closes, 1, w=weights)[
+                0
+            ]
+            next_slope_strength = next_slope / next_closes_std
             if direction == TrendDirection.DOWN:
-                slope_ok = next_slope < 0
+                slope_ok = next_slope_strength < -min_slope_strength
             elif direction == TrendDirection.UP:
-                slope_ok = next_slope > 0
+                slope_ok = next_slope_strength > min_slope_strength
 
         significant_move_away_ok = False
         if direction == TrendDirection.DOWN:
@@ -457,23 +467,7 @@ def zigzag(
             ):
                 significant_move_away_ok = True
 
-        min_price_change_ok = False
-        required_price_change = (
-            thresholds[next_confirmation_pos]
-            * min_price_change_ratio
-            * closes[next_confirmation_pos]
-        )
-        if len(next_closes) > 0:
-            actual_price_change = abs(next_closes[-1] - next_closes[0])
-            if actual_price_change >= required_price_change:
-                min_price_change_ok = True
-
-        return (
-            local_extrema_ok
-            and slope_ok
-            and significant_move_away_ok
-            and min_price_change_ok
-        )
+        return local_extrema_ok and slope_ok and significant_move_away_ok
 
     start_pos = 0
     initial_high_pos = start_pos