]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(qav3): speed up pivots labeling
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 15 Jun 2025 18:42:22 +0000 (20:42 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 15 Jun 2025 18:42:22 +0000 (20:42 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 33db07b9e081e0005c043a8477c975a95c9894ed..76b6492274d59e07062513bf3819e43a45945e55 100644 (file)
@@ -2,6 +2,7 @@ import copy
 from enum import IntEnum
 import logging
 import json
+import math
 import random
 from statistics import median
 import time
@@ -49,7 +50,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.89"
+    version = "3.7.90"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -1319,11 +1320,30 @@ def zigzag(
             return int(round(median([min_window, max_window])))
 
         return np.clip(
-            round(max_window - (max_window - min_window) * volatility_quantile),
+            round(min_window + (max_window - min_window) * volatility_quantile),
             min_window,
             max_window,
         ).astype(int)
 
+    def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
+        min_slopes_ok = max(
+            math.ceil(1 / slopes_ok_threshold),
+            math.ceil(1 / (1 - slopes_ok_threshold)),
+            4,
+        )
+        max_slopes_ok = math.ceil(min_slopes_ok * 2)
+        volatility_quantile = calculate_volatility_quantile(pos)
+        if np.isnan(volatility_quantile):
+            return int(round(median([min_slopes_ok, max_slopes_ok])))
+
+        return np.clip(
+            round(
+                min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
+            ),
+            min_slopes_ok,
+            max_slopes_ok,
+        ).astype(int)
+
     def update_candidate_pivot(pos: int, value: float):
         nonlocal candidate_pivot_pos, candidate_pivot_value
         if 0 <= pos < n:
@@ -1346,11 +1366,62 @@ def zigzag(
         last_pivot_pos = pos
         reset_candidate_pivot()
 
+    slopes_ok_cache: dict[tuple[int, int, int], bool] = {}
+
+    def get_slope_ok(
+        pos: int,
+        direction: TrendDirection,
+        enable_weighting: bool,
+        slope_confirmation_window: int,
+        min_slope: float,
+    ) -> bool:
+        cache_key = (
+            pos,
+            direction.value,
+            enable_weighting,
+            slope_confirmation_window,
+            min_slope,
+        )
+
+        if cache_key in slopes_ok_cache:
+            return slopes_ok_cache[cache_key]
+
+        next_start = pos
+        next_end = min(next_start + slope_confirmation_window, n)
+        next_closes = closes[next_start:next_end]
+
+        if len(next_closes) < 2:
+            slopes_ok_cache[cache_key] = False
+            return slopes_ok_cache[cache_key]
+
+        log_next_closes = np.log(next_closes)
+        log_next_closes_length = len(log_next_closes)
+
+        polyfit_kwargs = {}
+        if enable_weighting:
+            polyfit_kwargs = {"w": np.linspace(0.5, 1.5, log_next_closes_length)}
+        log_next_slope = np.polyfit(
+            range(log_next_closes_length),
+            log_next_closes,
+            1,
+            **polyfit_kwargs,
+        )[0]
+
+        if direction == TrendDirection.DOWN:
+            slopes_ok_cache[cache_key] = log_next_slope < -min_slope
+        elif direction == TrendDirection.UP:
+            slopes_ok_cache[cache_key] = log_next_slope > min_slope
+        else:
+            slopes_ok_cache[cache_key] = False
+
+        return slopes_ok_cache[cache_key]
+
     def is_pivot_confirmed(
+        pos: int,
         candidate_pivot_pos: int,
         direction: TrendDirection,
-        min_slope: float = np.finfo(float).eps,
         enable_weighting: bool = False,
+        min_slope: float = np.finfo(float).eps,
         slopes_ok_threshold: float = 0.75,
     ) -> bool:
         slope_confirmation_window = calculate_slope_confirmation_window(
@@ -1358,36 +1429,26 @@ def zigzag(
         )
 
         slopes_ok: list[bool] = []
+        for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+            slopes_ok.append(
+                get_slope_ok(
+                    pos=i,
+                    direction=direction,
+                    enable_weighting=enable_weighting,
+                    slope_confirmation_window=slope_confirmation_window,
+                    min_slope=min_slope,
+                )
+            )
 
-        for i in range(candidate_pivot_pos + 1, n):
-            next_start = i
-            next_end = min(next_start + slope_confirmation_window, n)
-
-            next_closes = closes[next_start:next_end]
-
-            if len(next_closes) >= 2:
-                log_next_closes = np.log(next_closes)
-                log_next_closes_length = len(log_next_closes)
-                polyfit_kwargs = {}
-                if enable_weighting:
-                    polyfit_kwargs = {
-                        "w": np.linspace(0.5, 1.5, log_next_closes_length)
-                    }
-                log_next_slope = np.polyfit(
-                    range(log_next_closes_length),
-                    log_next_closes,
-                    1,
-                    **polyfit_kwargs,
-                )[0]
-                if direction == TrendDirection.DOWN:
-                    slopes_ok.append(log_next_slope < -min_slope)
-                elif direction == TrendDirection.UP:
-                    slopes_ok.append(log_next_slope > min_slope)
-            else:
-                slopes_ok.append(False)
-
-            if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold:
-                return True
+        min_slopes_ok = calculate_min_slopes_ok(
+            candidate_pivot_pos, slopes_ok_threshold
+        )
+        n_slopes_ok = len(slopes_ok)
+        if n_slopes_ok > 0:
+            return (
+                n_slopes_ok >= min_slopes_ok
+                and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
+            )
 
         return False
 
@@ -1444,7 +1505,7 @@ def zigzag(
                 candidate_pivot_value - current_low
             ) / candidate_pivot_value >= thresholds[
                 candidate_pivot_pos
-            ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN):
+            ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
                 add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
                 state = TrendDirection.DOWN
 
@@ -1455,7 +1516,7 @@ def zigzag(
                 current_high - candidate_pivot_value
             ) / candidate_pivot_value >= thresholds[
                 candidate_pivot_pos
-            ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP):
+            ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
                 add_pivot(
                     candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
                 )
index e7ebbdb18b9ecf089218e3329910d7914789b93a..ca0cc7970854819667806565460929d752e2782a 100644 (file)
@@ -64,7 +64,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.93"
+        return "3.3.94"
 
     timeframe = "5m"
 
index e9d144ad6727026d43be47e9b455fcb58dd6d14e..2dcb20ef52f144364072b883833675c5224b7def 100644 (file)
@@ -1,5 +1,6 @@
 from enum import IntEnum
 from functools import lru_cache
+import math
 from statistics import median
 import numpy as np
 import pandas as pd
@@ -443,11 +444,30 @@ def zigzag(
             return int(round(median([min_window, max_window])))
 
         return np.clip(
-            round(max_window - (max_window - min_window) * volatility_quantile),
+            round(min_window + (max_window - min_window) * volatility_quantile),
             min_window,
             max_window,
         ).astype(int)
 
+    def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int:
+        min_slopes_ok = max(
+            math.ceil(1 / slopes_ok_threshold),
+            math.ceil(1 / (1 - slopes_ok_threshold)),
+            4,
+        )
+        max_slopes_ok = math.ceil(min_slopes_ok * 2)
+        volatility_quantile = calculate_volatility_quantile(pos)
+        if np.isnan(volatility_quantile):
+            return int(round(median([min_slopes_ok, max_slopes_ok])))
+
+        return np.clip(
+            round(
+                min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile
+            ),
+            min_slopes_ok,
+            max_slopes_ok,
+        ).astype(int)
+
     def update_candidate_pivot(pos: int, value: float):
         nonlocal candidate_pivot_pos, candidate_pivot_value
         if 0 <= pos < n:
@@ -470,11 +490,62 @@ def zigzag(
         last_pivot_pos = pos
         reset_candidate_pivot()
 
+    slopes_ok_cache: dict[tuple[int, int, int], bool] = {}
+
+    def get_slope_ok(
+        pos: int,
+        direction: TrendDirection,
+        enable_weighting: bool,
+        slope_confirmation_window: int,
+        min_slope: float,
+    ) -> bool:
+        cache_key = (
+            pos,
+            direction.value,
+            enable_weighting,
+            slope_confirmation_window,
+            min_slope,
+        )
+
+        if cache_key in slopes_ok_cache:
+            return slopes_ok_cache[cache_key]
+
+        next_start = pos
+        next_end = min(next_start + slope_confirmation_window, n)
+        next_closes = closes[next_start:next_end]
+
+        if len(next_closes) < 2:
+            slopes_ok_cache[cache_key] = False
+            return slopes_ok_cache[cache_key]
+
+        log_next_closes = np.log(next_closes)
+        log_next_closes_length = len(log_next_closes)
+
+        polyfit_kwargs = {}
+        if enable_weighting:
+            polyfit_kwargs = {"w": np.linspace(0.5, 1.5, log_next_closes_length)}
+        log_next_slope = np.polyfit(
+            range(log_next_closes_length),
+            log_next_closes,
+            1,
+            **polyfit_kwargs,
+        )[0]
+
+        if direction == TrendDirection.DOWN:
+            slopes_ok_cache[cache_key] = log_next_slope < -min_slope
+        elif direction == TrendDirection.UP:
+            slopes_ok_cache[cache_key] = log_next_slope > min_slope
+        else:
+            slopes_ok_cache[cache_key] = False
+
+        return slopes_ok_cache[cache_key]
+
     def is_pivot_confirmed(
+        pos: int,
         candidate_pivot_pos: int,
         direction: TrendDirection,
-        min_slope: float = np.finfo(float).eps,
         enable_weighting: bool = False,
+        min_slope: float = np.finfo(float).eps,
         slopes_ok_threshold: float = 0.75,
     ) -> bool:
         slope_confirmation_window = calculate_slope_confirmation_window(
@@ -482,36 +553,26 @@ def zigzag(
         )
 
         slopes_ok: list[bool] = []
+        for i in range(candidate_pivot_pos + 1, min(pos + 1, n)):
+            slopes_ok.append(
+                get_slope_ok(
+                    pos=i,
+                    direction=direction,
+                    enable_weighting=enable_weighting,
+                    slope_confirmation_window=slope_confirmation_window,
+                    min_slope=min_slope,
+                )
+            )
 
-        for i in range(candidate_pivot_pos + 1, n):
-            next_start = i
-            next_end = min(next_start + slope_confirmation_window, n)
-
-            next_closes = closes[next_start:next_end]
-
-            if len(next_closes) >= 2:
-                log_next_closes = np.log(next_closes)
-                log_next_closes_length = len(log_next_closes)
-                polyfit_kwargs = {}
-                if enable_weighting:
-                    polyfit_kwargs = {
-                        "w": np.linspace(0.5, 1.5, log_next_closes_length)
-                    }
-                log_next_slope = np.polyfit(
-                    range(log_next_closes_length),
-                    log_next_closes,
-                    1,
-                    **polyfit_kwargs,
-                )[0]
-                if direction == TrendDirection.DOWN:
-                    slopes_ok.append(log_next_slope < -min_slope)
-                elif direction == TrendDirection.UP:
-                    slopes_ok.append(log_next_slope > min_slope)
-            else:
-                slopes_ok.append(False)
-
-            if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold:
-                return True
+        min_slopes_ok = calculate_min_slopes_ok(
+            candidate_pivot_pos, slopes_ok_threshold
+        )
+        n_slopes_ok = len(slopes_ok)
+        if n_slopes_ok > 0:
+            return (
+                n_slopes_ok >= min_slopes_ok
+                and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold
+            )
 
         return False
 
@@ -568,7 +629,7 @@ def zigzag(
                 candidate_pivot_value - current_low
             ) / candidate_pivot_value >= thresholds[
                 candidate_pivot_pos
-            ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN):
+            ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
                 add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
                 state = TrendDirection.DOWN
 
@@ -579,7 +640,7 @@ def zigzag(
                 current_high - candidate_pivot_value
             ) / candidate_pivot_value >= thresholds[
                 candidate_pivot_pos
-            ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP):
+            ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
                 add_pivot(
                     candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
                 )