From 4347e4038bd698d94a5327c6dc5790243f8efb97 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 15 Jun 2025 20:42:22 +0200 Subject: [PATCH] perf(qav3): speed up pivots labeling MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 129 +++++++++++++----- .../user_data/strategies/QuickAdapterV3.py | 2 +- quickadapter/user_data/strategies/Utils.py | 127 ++++++++++++----- 3 files changed, 190 insertions(+), 68 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 33db07b..76b6492 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -2,6 +2,7 @@ import copy from enum import IntEnum import logging import json +import math import random from statistics import median import time @@ -49,7 +50,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.89" + version = "3.7.90" @cached_property def _optuna_config(self) -> dict: @@ -1319,11 +1320,30 @@ def zigzag( return int(round(median([min_window, max_window]))) return np.clip( - round(max_window - (max_window - min_window) * volatility_quantile), + round(min_window + (max_window - min_window) * volatility_quantile), min_window, max_window, ).astype(int) + def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int: + min_slopes_ok = max( + math.ceil(1 / slopes_ok_threshold), + math.ceil(1 / (1 - slopes_ok_threshold)), + 4, + ) + max_slopes_ok = math.ceil(min_slopes_ok * 2) + volatility_quantile = calculate_volatility_quantile(pos) + if np.isnan(volatility_quantile): + return int(round(median([min_slopes_ok, max_slopes_ok]))) + + return np.clip( + round( + min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile + ), + min_slopes_ok, + max_slopes_ok, + ).astype(int) + def update_candidate_pivot(pos: int, value: float): nonlocal candidate_pivot_pos, candidate_pivot_value if 0 <= pos < n: @@ -1346,11 +1366,62 @@ def zigzag( last_pivot_pos = pos reset_candidate_pivot() + slopes_ok_cache: dict[tuple[int, int, int], bool] = {} + + def get_slope_ok( + pos: int, + direction: TrendDirection, + enable_weighting: bool, + slope_confirmation_window: int, + min_slope: float, + ) -> bool: + cache_key = ( + pos, + direction.value, + enable_weighting, + slope_confirmation_window, + min_slope, + ) + + if cache_key in slopes_ok_cache: + return slopes_ok_cache[cache_key] + + next_start = pos + next_end = min(next_start + slope_confirmation_window, n) + next_closes = closes[next_start:next_end] + + if len(next_closes) < 2: + slopes_ok_cache[cache_key] = False + return slopes_ok_cache[cache_key] + + log_next_closes = np.log(next_closes) + log_next_closes_length = len(log_next_closes) + + polyfit_kwargs = {} + if enable_weighting: + polyfit_kwargs = {"w": np.linspace(0.5, 1.5, log_next_closes_length)} + log_next_slope = np.polyfit( + range(log_next_closes_length), + log_next_closes, + 1, + **polyfit_kwargs, + )[0] + + if direction == TrendDirection.DOWN: + slopes_ok_cache[cache_key] = log_next_slope < -min_slope + elif direction == TrendDirection.UP: + slopes_ok_cache[cache_key] = log_next_slope > min_slope + else: + slopes_ok_cache[cache_key] = False + + return slopes_ok_cache[cache_key] + def is_pivot_confirmed( + pos: int, candidate_pivot_pos: int, direction: TrendDirection, - min_slope: float = np.finfo(float).eps, enable_weighting: bool = False, + min_slope: float = np.finfo(float).eps, slopes_ok_threshold: float = 0.75, ) -> bool: slope_confirmation_window = calculate_slope_confirmation_window( @@ -1358,36 +1429,26 @@ def zigzag( ) slopes_ok: list[bool] = [] + for i in range(candidate_pivot_pos + 1, min(pos + 1, n)): + slopes_ok.append( + get_slope_ok( + pos=i, + direction=direction, + enable_weighting=enable_weighting, + slope_confirmation_window=slope_confirmation_window, + min_slope=min_slope, + ) + ) - for i in range(candidate_pivot_pos + 1, n): - next_start = i - next_end = min(next_start + slope_confirmation_window, n) - - next_closes = closes[next_start:next_end] - - if len(next_closes) >= 2: - log_next_closes = np.log(next_closes) - log_next_closes_length = len(log_next_closes) - polyfit_kwargs = {} - if enable_weighting: - polyfit_kwargs = { - "w": np.linspace(0.5, 1.5, log_next_closes_length) - } - log_next_slope = np.polyfit( - range(log_next_closes_length), - log_next_closes, - 1, - **polyfit_kwargs, - )[0] - if direction == TrendDirection.DOWN: - slopes_ok.append(log_next_slope < -min_slope) - elif direction == TrendDirection.UP: - slopes_ok.append(log_next_slope > min_slope) - else: - slopes_ok.append(False) - - if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold: - return True + min_slopes_ok = calculate_min_slopes_ok( + candidate_pivot_pos, slopes_ok_threshold + ) + n_slopes_ok = len(slopes_ok) + if n_slopes_ok > 0: + return ( + n_slopes_ok >= min_slopes_ok + and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold + ) return False @@ -1444,7 +1505,7 @@ def zigzag( candidate_pivot_value - current_low ) / candidate_pivot_value >= thresholds[ candidate_pivot_pos - ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN): + ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN): add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) state = TrendDirection.DOWN @@ -1455,7 +1516,7 @@ def zigzag( current_high - candidate_pivot_value ) / candidate_pivot_value >= thresholds[ candidate_pivot_pos - ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP): + ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP): add_pivot( candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN ) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index e7ebbdb..ca0cc79 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -64,7 +64,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.93" + return "3.3.94" timeframe = "5m" diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index e9d144a..2dcb20e 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -1,5 +1,6 @@ from enum import IntEnum from functools import lru_cache +import math from statistics import median import numpy as np import pandas as pd @@ -443,11 +444,30 @@ def zigzag( return int(round(median([min_window, max_window]))) return np.clip( - round(max_window - (max_window - min_window) * volatility_quantile), + round(min_window + (max_window - min_window) * volatility_quantile), min_window, max_window, ).astype(int) + def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int: + min_slopes_ok = max( + math.ceil(1 / slopes_ok_threshold), + math.ceil(1 / (1 - slopes_ok_threshold)), + 4, + ) + max_slopes_ok = math.ceil(min_slopes_ok * 2) + volatility_quantile = calculate_volatility_quantile(pos) + if np.isnan(volatility_quantile): + return int(round(median([min_slopes_ok, max_slopes_ok]))) + + return np.clip( + round( + min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile + ), + min_slopes_ok, + max_slopes_ok, + ).astype(int) + def update_candidate_pivot(pos: int, value: float): nonlocal candidate_pivot_pos, candidate_pivot_value if 0 <= pos < n: @@ -470,11 +490,62 @@ def zigzag( last_pivot_pos = pos reset_candidate_pivot() + slopes_ok_cache: dict[tuple[int, int, int], bool] = {} + + def get_slope_ok( + pos: int, + direction: TrendDirection, + enable_weighting: bool, + slope_confirmation_window: int, + min_slope: float, + ) -> bool: + cache_key = ( + pos, + direction.value, + enable_weighting, + slope_confirmation_window, + min_slope, + ) + + if cache_key in slopes_ok_cache: + return slopes_ok_cache[cache_key] + + next_start = pos + next_end = min(next_start + slope_confirmation_window, n) + next_closes = closes[next_start:next_end] + + if len(next_closes) < 2: + slopes_ok_cache[cache_key] = False + return slopes_ok_cache[cache_key] + + log_next_closes = np.log(next_closes) + log_next_closes_length = len(log_next_closes) + + polyfit_kwargs = {} + if enable_weighting: + polyfit_kwargs = {"w": np.linspace(0.5, 1.5, log_next_closes_length)} + log_next_slope = np.polyfit( + range(log_next_closes_length), + log_next_closes, + 1, + **polyfit_kwargs, + )[0] + + if direction == TrendDirection.DOWN: + slopes_ok_cache[cache_key] = log_next_slope < -min_slope + elif direction == TrendDirection.UP: + slopes_ok_cache[cache_key] = log_next_slope > min_slope + else: + slopes_ok_cache[cache_key] = False + + return slopes_ok_cache[cache_key] + def is_pivot_confirmed( + pos: int, candidate_pivot_pos: int, direction: TrendDirection, - min_slope: float = np.finfo(float).eps, enable_weighting: bool = False, + min_slope: float = np.finfo(float).eps, slopes_ok_threshold: float = 0.75, ) -> bool: slope_confirmation_window = calculate_slope_confirmation_window( @@ -482,36 +553,26 @@ def zigzag( ) slopes_ok: list[bool] = [] + for i in range(candidate_pivot_pos + 1, min(pos + 1, n)): + slopes_ok.append( + get_slope_ok( + pos=i, + direction=direction, + enable_weighting=enable_weighting, + slope_confirmation_window=slope_confirmation_window, + min_slope=min_slope, + ) + ) - for i in range(candidate_pivot_pos + 1, n): - next_start = i - next_end = min(next_start + slope_confirmation_window, n) - - next_closes = closes[next_start:next_end] - - if len(next_closes) >= 2: - log_next_closes = np.log(next_closes) - log_next_closes_length = len(log_next_closes) - polyfit_kwargs = {} - if enable_weighting: - polyfit_kwargs = { - "w": np.linspace(0.5, 1.5, log_next_closes_length) - } - log_next_slope = np.polyfit( - range(log_next_closes_length), - log_next_closes, - 1, - **polyfit_kwargs, - )[0] - if direction == TrendDirection.DOWN: - slopes_ok.append(log_next_slope < -min_slope) - elif direction == TrendDirection.UP: - slopes_ok.append(log_next_slope > min_slope) - else: - slopes_ok.append(False) - - if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold: - return True + min_slopes_ok = calculate_min_slopes_ok( + candidate_pivot_pos, slopes_ok_threshold + ) + n_slopes_ok = len(slopes_ok) + if n_slopes_ok > 0: + return ( + n_slopes_ok >= min_slopes_ok + and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold + ) return False @@ -568,7 +629,7 @@ def zigzag( candidate_pivot_value - current_low ) / candidate_pivot_value >= thresholds[ candidate_pivot_pos - ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN): + ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN): add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) state = TrendDirection.DOWN @@ -579,7 +640,7 @@ def zigzag( current_high - candidate_pivot_value ) / candidate_pivot_value >= thresholds[ candidate_pivot_pos - ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP): + ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP): add_pivot( candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN ) -- 2.43.0