]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
fix(qav3): ensure pullbacks can't invalidate valid pivots at labeling
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 16 Jun 2025 18:49:34 +0000 (20:49 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 16 Jun 2025 18:49:34 +0000 (20:49 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 304d2065a3eec7110f09a7c0fc48fbddce0ce582..a6b7c1b332273435205ba2bc900129e108887b34 100644 (file)
@@ -2,7 +2,6 @@ import copy
 from enum import IntEnum
 import logging
 import json
-import math
 import random
 from statistics import median
 import time
@@ -14,7 +13,7 @@ import sklearn
 import warnings
 import talib.abstract as ta
 
-from functools import cached_property, lru_cache
+from functools import cached_property
 from typing import Any, Callable, Optional
 from pathlib import Path
 from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
@@ -50,7 +49,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.92"
+    version = "3.7.93"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -1310,25 +1309,10 @@ def zigzag(
 
         return volatility_quantile_cache[pos]
 
-    def calculate_slope_confirmation_window(
-        pos: int,
-        min_window: int = 3,
-        max_window: int = 5,
-    ) -> int:
-        volatility_quantile = calculate_volatility_quantile(pos)
-        if np.isnan(volatility_quantile):
-            return int(round(median([min_window, max_window])))
-
-        return np.clip(
-            round(min_window + (max_window - min_window) * volatility_quantile),
-            min_window,
-            max_window,
-        ).astype(int)
-
     def calculate_slopes_ok_threshold(
         pos: int,
-        min_threshold: float = 0.5,
-        max_threshold: float = 0.75,
+        min_threshold: float = 0.65,
+        max_threshold: float = 0.85,
     ) -> float:
         volatility_quantile = calculate_volatility_quantile(pos)
         if np.isnan(volatility_quantile):
@@ -1336,30 +1320,6 @@ def zigzag(
 
         return min_threshold + (max_threshold - min_threshold) * volatility_quantile
 
-    @lru_cache(maxsize=4096)
-    def calculate_slopes_ok_min_max(slopes_ok_threshold: float) -> tuple[int, int]:
-        min_slope_bound1 = math.ceil(1 / slopes_ok_threshold)
-        min_slope_bound2 = math.ceil(1 / (1 - slopes_ok_threshold))
-        return min(min_slope_bound1, min_slope_bound2), max(
-            min_slope_bound1, min_slope_bound2
-        )
-
-    def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
-        min_slopes_ok, max_slopes_ok = calculate_slopes_ok_min_max(slopes_ok_threshold)
-        if min_slopes_ok == max_slopes_ok:
-            return min_slopes_ok
-        volatility_quantile = calculate_volatility_quantile(pos)
-        if np.isnan(volatility_quantile):
-            return int(round(median([min_slopes_ok, max_slopes_ok])))
-
-        return np.clip(
-            round(
-                min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
-            ),
-            min_slopes_ok,
-            max_slopes_ok,
-        ).astype(int)
-
     def update_candidate_pivot(pos: int, value: float):
         nonlocal candidate_pivot_pos, candidate_pivot_value
         if 0 <= pos < n:
@@ -1382,51 +1342,39 @@ def zigzag(
         last_pivot_pos = pos
         reset_candidate_pivot()
 
-    slope_ok_cache: dict[tuple[int, int, bool, int, float], bool] = {}
+    slope_ok_cache: dict[tuple[int, int, int, float]] = {}
 
     def get_slope_ok(
         pos: int,
+        candidate_pivot_pos: int,
         direction: TrendDirection,
-        enable_weighting: bool,
-        slope_confirmation_window: int,
         min_slope: float,
     ) -> bool:
         cache_key = (
             pos,
+            candidate_pivot_pos,
             direction.value,
-            enable_weighting,
-            slope_confirmation_window,
             min_slope,
         )
 
         if cache_key in slope_ok_cache:
             return slope_ok_cache[cache_key]
 
-        next_start = pos
-        next_end = min(next_start + slope_confirmation_window, n)
-        next_closes = closes[next_start:next_end]
-
-        if len(next_closes) < 2:
+        if pos <= candidate_pivot_pos:
             slope_ok_cache[cache_key] = False
             return slope_ok_cache[cache_key]
 
-        log_next_closes = np.log(next_closes)
-        log_next_closes_length = len(log_next_closes)
+        log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
+        log_current_close = np.log(closes[pos])
 
-        polyfit_kwargs = {}
-        if enable_weighting:
-            polyfit_kwargs["w"] = np.linspace(0.5, 1.5, log_next_closes_length)
-        log_next_slope = np.polyfit(
-            range(log_next_closes_length),
-            log_next_closes,
-            1,
-            **polyfit_kwargs,
-        )[0]
+        log_slope_close = (log_current_close - log_candidate_pivot_close) / (
+            pos - candidate_pivot_pos
+        )
 
         if direction == TrendDirection.DOWN:
-            slope_ok_cache[cache_key] = log_next_slope < -min_slope
+            slope_ok_cache[cache_key] = log_slope_close < -min_slope
         elif direction == TrendDirection.UP:
-            slope_ok_cache[cache_key] = log_next_slope > min_slope
+            slope_ok_cache[cache_key] = log_slope_close > min_slope
         else:
             slope_ok_cache[cache_key] = False
 
@@ -1436,37 +1384,37 @@ def zigzag(
         pos: int,
         candidate_pivot_pos: int,
         direction: TrendDirection,
-        enable_weighting: bool = False,
         min_slope: float = np.finfo(float).eps,
+        alpha: float = 0.05,
     ) -> bool:
-        slope_confirmation_window = calculate_slope_confirmation_window(
-            candidate_pivot_pos
-        )
+        start_pos = candidate_pivot_pos + 1
+        end_pos = min(pos + 1, n)
+        n_slopes = max(0, end_pos - start_pos)
+
+        if n_slopes < 1:
+            return False
 
         slopes_ok: list[bool] = []
-        for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+        for i in range(start_pos, end_pos):
             slopes_ok.append(
                 get_slope_ok(
                     pos=i,
+                    candidate_pivot_pos=candidate_pivot_pos,
                     direction=direction,
-                    enable_weighting=enable_weighting,
-                    slope_confirmation_window=slope_confirmation_window,
                     min_slope=min_slope,
                 )
             )
 
         slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
-        min_slopes_ok = calculate_min_slopes_ok(
-            candidate_pivot_pos, slopes_ok_threshold
+        n_slopes_ok = sum(slopes_ok)
+        binomtest = sp.stats.binomtest(
+            k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
         )
-        n_slopes_ok = len(slopes_ok)
-        if n_slopes_ok > 0:
-            return (
-                n_slopes_ok >= min_slopes_ok
-                and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
-            )
 
-        return False
+        return (
+            binomtest.pvalue <= alpha
+            and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
+        )
 
     start_pos = 0
     initial_high_pos = start_pos
index 0cdadd55e2b16daf6067df93c7f163d8f38b0f20..334c1b3d445c3e31f12eb52caf9b9a4c27c65704 100644 (file)
@@ -64,7 +64,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.96"
+        return "3.3.97"
 
     timeframe = "5m"
 
index d4e61fed5a422043d5b64f3f61cce3a221b36752..65435645ccec38de7db7dad0a75c729b41c899a1 100644 (file)
@@ -1,6 +1,5 @@
 from enum import IntEnum
 from functools import lru_cache
-import math
 from statistics import median
 import numpy as np
 import pandas as pd
@@ -439,25 +438,10 @@ def zigzag(
 
         return volatility_quantile_cache[pos]
 
-    def calculate_slope_confirmation_window(
-        pos: int,
-        min_window: int = 3,
-        max_window: int = 5,
-    ) -> int:
-        volatility_quantile = calculate_volatility_quantile(pos)
-        if np.isnan(volatility_quantile):
-            return int(round(median([min_window, max_window])))
-
-        return np.clip(
-            round(min_window + (max_window - min_window) * volatility_quantile),
-            min_window,
-            max_window,
-        ).astype(int)
-
     def calculate_slopes_ok_threshold(
         pos: int,
-        min_threshold: float = 0.5,
-        max_threshold: float = 0.75,
+        min_threshold: float = 0.65,
+        max_threshold: float = 0.85,
     ) -> float:
         volatility_quantile = calculate_volatility_quantile(pos)
         if np.isnan(volatility_quantile):
@@ -465,30 +449,6 @@ def zigzag(
 
         return min_threshold + (max_threshold - min_threshold) * volatility_quantile
 
-    @lru_cache(maxsize=4096)
-    def calculate_slopes_ok_min_max(slopes_ok_threshold: float) -> tuple[int, int]:
-        min_slope_bound1 = math.ceil(1 / slopes_ok_threshold)
-        min_slope_bound2 = math.ceil(1 / (1 - slopes_ok_threshold))
-        return min(min_slope_bound1, min_slope_bound2), max(
-            min_slope_bound1, min_slope_bound2
-        )
-
-    def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
-        min_slopes_ok, max_slopes_ok = calculate_slopes_ok_min_max(slopes_ok_threshold)
-        if min_slopes_ok == max_slopes_ok:
-            return min_slopes_ok
-        volatility_quantile = calculate_volatility_quantile(pos)
-        if np.isnan(volatility_quantile):
-            return int(round(median([min_slopes_ok, max_slopes_ok])))
-
-        return np.clip(
-            round(
-                min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
-            ),
-            min_slopes_ok,
-            max_slopes_ok,
-        ).astype(int)
-
     def update_candidate_pivot(pos: int, value: float):
         nonlocal candidate_pivot_pos, candidate_pivot_value
         if 0 <= pos < n:
@@ -511,51 +471,39 @@ def zigzag(
         last_pivot_pos = pos
         reset_candidate_pivot()
 
-    slope_ok_cache: dict[tuple[int, int, bool, int, float], bool] = {}
+    slope_ok_cache: dict[tuple[int, int, int, float]] = {}
 
     def get_slope_ok(
         pos: int,
+        candidate_pivot_pos: int,
         direction: TrendDirection,
-        enable_weighting: bool,
-        slope_confirmation_window: int,
         min_slope: float,
     ) -> bool:
         cache_key = (
             pos,
+            candidate_pivot_pos,
             direction.value,
-            enable_weighting,
-            slope_confirmation_window,
             min_slope,
         )
 
         if cache_key in slope_ok_cache:
             return slope_ok_cache[cache_key]
 
-        next_start = pos
-        next_end = min(next_start + slope_confirmation_window, n)
-        next_closes = closes[next_start:next_end]
-
-        if len(next_closes) < 2:
+        if pos <= candidate_pivot_pos:
             slope_ok_cache[cache_key] = False
             return slope_ok_cache[cache_key]
 
-        log_next_closes = np.log(next_closes)
-        log_next_closes_length = len(log_next_closes)
+        log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
+        log_current_close = np.log(closes[pos])
 
-        polyfit_kwargs = {}
-        if enable_weighting:
-            polyfit_kwargs["w"] = np.linspace(0.5, 1.5, log_next_closes_length)
-        log_next_slope = np.polyfit(
-            range(log_next_closes_length),
-            log_next_closes,
-            1,
-            **polyfit_kwargs,
-        )[0]
+        log_slope_close = (log_current_close - log_candidate_pivot_close) / (
+            pos - candidate_pivot_pos
+        )
 
         if direction == TrendDirection.DOWN:
-            slope_ok_cache[cache_key] = log_next_slope < -min_slope
+            slope_ok_cache[cache_key] = log_slope_close < -min_slope
         elif direction == TrendDirection.UP:
-            slope_ok_cache[cache_key] = log_next_slope > min_slope
+            slope_ok_cache[cache_key] = log_slope_close > min_slope
         else:
             slope_ok_cache[cache_key] = False
 
@@ -565,37 +513,37 @@ def zigzag(
         pos: int,
         candidate_pivot_pos: int,
         direction: TrendDirection,
-        enable_weighting: bool = False,
         min_slope: float = np.finfo(float).eps,
+        alpha: float = 0.05,
     ) -> bool:
-        slope_confirmation_window = calculate_slope_confirmation_window(
-            candidate_pivot_pos
-        )
+        start_pos = candidate_pivot_pos + 1
+        end_pos = min(pos + 1, n)
+        n_slopes = max(0, end_pos - start_pos)
+
+        if n_slopes < 1:
+            return False
 
         slopes_ok: list[bool] = []
-        for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+        for i in range(start_pos, end_pos):
             slopes_ok.append(
                 get_slope_ok(
                     pos=i,
+                    candidate_pivot_pos=candidate_pivot_pos,
                     direction=direction,
-                    enable_weighting=enable_weighting,
-                    slope_confirmation_window=slope_confirmation_window,
                     min_slope=min_slope,
                 )
             )
 
         slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
-        min_slopes_ok = calculate_min_slopes_ok(
-            candidate_pivot_pos, slopes_ok_threshold
+        n_slopes_ok = sum(slopes_ok)
+        binomtest = sp.stats.binomtest(
+            k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
         )
-        n_slopes_ok = len(slopes_ok)
-        if n_slopes_ok > 0:
-            return (
-                n_slopes_ok >= min_slopes_ok
-                and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
-            )
 
-        return False
+        return (
+            binomtest.pvalue <= alpha
+            and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
+        )
 
     start_pos = 0
     initial_high_pos = start_pos