From: Jérôme Benoit Date: Mon, 16 Jun 2025 18:49:34 +0000 (+0200) Subject: fix(qav3): ensure pullbacks can't invalidate valid pivots at labeling X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=7f5ba503aac05102373ed70998334cdf1fb8a240;p=freqai-strategies.git fix(qav3): ensure pullbacks can't invalidate valid pivots at labeling Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 304d206..a6b7c1b 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -2,7 +2,6 @@ import copy from enum import IntEnum import logging import json -import math import random from statistics import median import time @@ -14,7 +13,7 @@ import sklearn import warnings import talib.abstract as ta -from functools import cached_property, lru_cache +from functools import cached_property from typing import Any, Callable, Optional from pathlib import Path from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel @@ -50,7 +49,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.92" + version = "3.7.93" @cached_property def _optuna_config(self) -> dict: @@ -1310,25 +1309,10 @@ def zigzag( return volatility_quantile_cache[pos] - def calculate_slope_confirmation_window( - pos: int, - min_window: int = 3, - max_window: int = 5, - ) -> int: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return int(round(median([min_window, max_window]))) - - return np.clip( - round(min_window + (max_window - min_window) * volatility_quantile), - min_window, - max_window, - ).astype(int) - def calculate_slopes_ok_threshold( pos: int, - min_threshold: float = 0.5, - max_threshold: float = 0.75, + min_threshold: float = 0.65, + max_threshold: float = 0.85, ) -> float: volatility_quantile = calculate_volatility_quantile(pos) if np.isnan(volatility_quantile): @@ -1336,30 +1320,6 @@ def zigzag( return min_threshold + (max_threshold - min_threshold) * volatility_quantile - @lru_cache(maxsize=4096) - def calculate_slopes_ok_min_max(slopes_ok_threshold: float) -> tuple[int, int]: - min_slope_bound1 = math.ceil(1 / slopes_ok_threshold) - min_slope_bound2 = math.ceil(1 / (1 - slopes_ok_threshold)) - return min(min_slope_bound1, min_slope_bound2), max( - min_slope_bound1, min_slope_bound2 - ) - - def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int: - min_slopes_ok, max_slopes_ok = calculate_slopes_ok_min_max(slopes_ok_threshold) - if min_slopes_ok == max_slopes_ok: - return min_slopes_ok - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return int(round(median([min_slopes_ok, max_slopes_ok]))) - - return np.clip( - round( - min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile - ), - min_slopes_ok, - max_slopes_ok, - ).astype(int) - def update_candidate_pivot(pos: int, value: float): nonlocal candidate_pivot_pos, candidate_pivot_value if 0 <= pos < n: @@ -1382,51 +1342,39 @@ def zigzag( last_pivot_pos = pos reset_candidate_pivot() - slope_ok_cache: dict[tuple[int, int, bool, int, float], bool] = {} + slope_ok_cache: dict[tuple[int, int, int, float]] = {} def get_slope_ok( pos: int, + candidate_pivot_pos: int, direction: TrendDirection, - enable_weighting: bool, - slope_confirmation_window: int, min_slope: float, ) -> bool: cache_key = ( pos, + candidate_pivot_pos, direction.value, - enable_weighting, - slope_confirmation_window, min_slope, ) if cache_key in slope_ok_cache: return slope_ok_cache[cache_key] - next_start = pos - next_end = min(next_start + slope_confirmation_window, n) - next_closes = closes[next_start:next_end] - - if len(next_closes) < 2: + if pos <= candidate_pivot_pos: slope_ok_cache[cache_key] = False return slope_ok_cache[cache_key] - log_next_closes = np.log(next_closes) - log_next_closes_length = len(log_next_closes) + log_candidate_pivot_close = np.log(closes[candidate_pivot_pos]) + log_current_close = np.log(closes[pos]) - polyfit_kwargs = {} - if enable_weighting: - polyfit_kwargs["w"] = np.linspace(0.5, 1.5, log_next_closes_length) - log_next_slope = np.polyfit( - range(log_next_closes_length), - log_next_closes, - 1, - **polyfit_kwargs, - )[0] + log_slope_close = (log_current_close - log_candidate_pivot_close) / ( + pos - candidate_pivot_pos + ) if direction == TrendDirection.DOWN: - slope_ok_cache[cache_key] = log_next_slope < -min_slope + slope_ok_cache[cache_key] = log_slope_close < -min_slope elif direction == TrendDirection.UP: - slope_ok_cache[cache_key] = log_next_slope > min_slope + slope_ok_cache[cache_key] = log_slope_close > min_slope else: slope_ok_cache[cache_key] = False @@ -1436,37 +1384,37 @@ def zigzag( pos: int, candidate_pivot_pos: int, direction: TrendDirection, - enable_weighting: bool = False, min_slope: float = np.finfo(float).eps, + alpha: float = 0.05, ) -> bool: - slope_confirmation_window = calculate_slope_confirmation_window( - candidate_pivot_pos - ) + start_pos = candidate_pivot_pos + 1 + end_pos = min(pos + 1, n) + n_slopes = max(0, end_pos - start_pos) + + if n_slopes < 1: + return False slopes_ok: list[bool] = [] - for i in range(candidate_pivot_pos + 1, min(pos + 1, n)): + for i in range(start_pos, end_pos): slopes_ok.append( get_slope_ok( pos=i, + candidate_pivot_pos=candidate_pivot_pos, direction=direction, - enable_weighting=enable_weighting, - slope_confirmation_window=slope_confirmation_window, min_slope=min_slope, ) ) slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos) - min_slopes_ok = calculate_min_slopes_ok( - candidate_pivot_pos, slopes_ok_threshold + n_slopes_ok = sum(slopes_ok) + binomtest = sp.stats.binomtest( + k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater" ) - n_slopes_ok = len(slopes_ok) - if n_slopes_ok > 0: - return ( - n_slopes_ok >= min_slopes_ok - and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold - ) - return False + return ( + binomtest.pvalue <= alpha + and (n_slopes_ok / n_slopes) >= slopes_ok_threshold + ) start_pos = 0 initial_high_pos = start_pos diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 0cdadd5..334c1b3 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -64,7 +64,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.96" + return "3.3.97" timeframe = "5m" diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index d4e61fe..6543564 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -1,6 +1,5 @@ from enum import IntEnum from functools import lru_cache -import math from statistics import median import numpy as np import pandas as pd @@ -439,25 +438,10 @@ def zigzag( return volatility_quantile_cache[pos] - def calculate_slope_confirmation_window( - pos: int, - min_window: int = 3, - max_window: int = 5, - ) -> int: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return int(round(median([min_window, max_window]))) - - return np.clip( - round(min_window + (max_window - min_window) * volatility_quantile), - min_window, - max_window, - ).astype(int) - def calculate_slopes_ok_threshold( pos: int, - min_threshold: float = 0.5, - max_threshold: float = 0.75, + min_threshold: float = 0.65, + max_threshold: float = 0.85, ) -> float: volatility_quantile = calculate_volatility_quantile(pos) if np.isnan(volatility_quantile): @@ -465,30 +449,6 @@ def zigzag( return min_threshold + (max_threshold - min_threshold) * volatility_quantile - @lru_cache(maxsize=4096) - def calculate_slopes_ok_min_max(slopes_ok_threshold: float) -> tuple[int, int]: - min_slope_bound1 = math.ceil(1 / slopes_ok_threshold) - min_slope_bound2 = math.ceil(1 / (1 - slopes_ok_threshold)) - return min(min_slope_bound1, min_slope_bound2), max( - min_slope_bound1, min_slope_bound2 - ) - - def calculate_min_slopes_ok(pos: int, slopes_ok_threshold: float) -> int: - min_slopes_ok, max_slopes_ok = calculate_slopes_ok_min_max(slopes_ok_threshold) - if min_slopes_ok == max_slopes_ok: - return min_slopes_ok - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return int(round(median([min_slopes_ok, max_slopes_ok]))) - - return np.clip( - round( - min_slopes_ok + (max_slopes_ok - min_slopes_ok) * volatility_quantile - ), - min_slopes_ok, - max_slopes_ok, - ).astype(int) - def update_candidate_pivot(pos: int, value: float): nonlocal candidate_pivot_pos, candidate_pivot_value if 0 <= pos < n: @@ -511,51 +471,39 @@ def zigzag( last_pivot_pos = pos reset_candidate_pivot() - slope_ok_cache: dict[tuple[int, int, bool, int, float], bool] = {} + slope_ok_cache: dict[tuple[int, int, int, float]] = {} def get_slope_ok( pos: int, + candidate_pivot_pos: int, direction: TrendDirection, - enable_weighting: bool, - slope_confirmation_window: int, min_slope: float, ) -> bool: cache_key = ( pos, + candidate_pivot_pos, direction.value, - enable_weighting, - slope_confirmation_window, min_slope, ) if cache_key in slope_ok_cache: return slope_ok_cache[cache_key] - next_start = pos - next_end = min(next_start + slope_confirmation_window, n) - next_closes = closes[next_start:next_end] - - if len(next_closes) < 2: + if pos <= candidate_pivot_pos: slope_ok_cache[cache_key] = False return slope_ok_cache[cache_key] - log_next_closes = np.log(next_closes) - log_next_closes_length = len(log_next_closes) + log_candidate_pivot_close = np.log(closes[candidate_pivot_pos]) + log_current_close = np.log(closes[pos]) - polyfit_kwargs = {} - if enable_weighting: - polyfit_kwargs["w"] = np.linspace(0.5, 1.5, log_next_closes_length) - log_next_slope = np.polyfit( - range(log_next_closes_length), - log_next_closes, - 1, - **polyfit_kwargs, - )[0] + log_slope_close = (log_current_close - log_candidate_pivot_close) / ( + pos - candidate_pivot_pos + ) if direction == TrendDirection.DOWN: - slope_ok_cache[cache_key] = log_next_slope < -min_slope + slope_ok_cache[cache_key] = log_slope_close < -min_slope elif direction == TrendDirection.UP: - slope_ok_cache[cache_key] = log_next_slope > min_slope + slope_ok_cache[cache_key] = log_slope_close > min_slope else: slope_ok_cache[cache_key] = False @@ -565,37 +513,37 @@ def zigzag( pos: int, candidate_pivot_pos: int, direction: TrendDirection, - enable_weighting: bool = False, min_slope: float = np.finfo(float).eps, + alpha: float = 0.05, ) -> bool: - slope_confirmation_window = calculate_slope_confirmation_window( - candidate_pivot_pos - ) + start_pos = candidate_pivot_pos + 1 + end_pos = min(pos + 1, n) + n_slopes = max(0, end_pos - start_pos) + + if n_slopes < 1: + return False slopes_ok: list[bool] = [] - for i in range(candidate_pivot_pos + 1, min(pos + 1, n)): + for i in range(start_pos, end_pos): slopes_ok.append( get_slope_ok( pos=i, + candidate_pivot_pos=candidate_pivot_pos, direction=direction, - enable_weighting=enable_weighting, - slope_confirmation_window=slope_confirmation_window, min_slope=min_slope, ) ) slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos) - min_slopes_ok = calculate_min_slopes_ok( - candidate_pivot_pos, slopes_ok_threshold + n_slopes_ok = sum(slopes_ok) + binomtest = sp.stats.binomtest( + k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater" ) - n_slopes_ok = len(slopes_ok) - if n_slopes_ok > 0: - return ( - n_slopes_ok >= min_slopes_ok - and sum(slopes_ok) / n_slopes_ok >= slopes_ok_threshold - ) - return False + return ( + binomtest.pvalue <= alpha + and (n_slopes_ok / n_slopes) >= slopes_ok_threshold + ) start_pos = 0 initial_high_pos = start_pos