From 05e11265123709c5ba458a939bf9109f914834a9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 5 May 2025 18:48:01 +0200 Subject: [PATCH] fix(qav3): untangle candidate pivot logic from confirmed pivot one MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 125 +++++++++++------- .../user_data/strategies/QuickAdapterV3.py | 25 ++-- quickadapter/user_data/strategies/Utils.py | 123 ++++++++++------- 3 files changed, 172 insertions(+), 101 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index ed01c2b..6fac905 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.28" + version = "3.7.29" @cached_property def _optuna_config(self) -> dict: @@ -850,10 +850,10 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 1.0, - confirmation_window: int = 3, + confirmation_window: int = 2, depth: int = 12, ) -> tuple[list[int], list[float], list[int]]: - if df.empty or len(df) < natr_period + confirmation_window: + if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1): return [], [], [] indices = df.index.tolist() @@ -865,8 +865,19 @@ def zigzag( lows = df["low"].values state: TrendDirection = TrendDirection.NEUTRAL - pivots_indices, pivots_values, pivots_directions = [], [], [] + last_pivot_pos = -depth - 1 + pivots_indices, pivots_values, pivots_directions = [], [], [] + + candidate_pivot_pos = -1 + candidate_pivot_value = np.nan + candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL + + def update_candidate_pivot(pos: int, value: float, direction: TrendDirection): + nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction + candidate_pivot_pos = pos + candidate_pivot_value = value + candidate_pivot_direction = direction def add_pivot(pos: int, value: float, direction: TrendDirection): nonlocal last_pivot_pos @@ -876,21 +887,40 @@ def zigzag( pivots_values.append(value) pivots_directions.append(direction) last_pivot_pos = pos - - def update_last_pivot(pos: int, value: float, direction: TrendDirection): - if pivots_indices and indices[pos] != pivots_indices[-1]: - pivots_indices[-1] = indices[pos] - pivots_values[-1] = value - pivots_directions[-1] = direction + update_candidate_pivot( + pos, + value, + TrendDirection.UP + if direction == TrendDirection.DOWN + else TrendDirection.DOWN, + ) def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool: - if pos + confirmation_window >= len(df): + if pos - confirmation_window < 0 or pos + confirmation_window >= len(df): return False - next_closes = closes[pos + 1 : pos + confirmation_window + 1] + next_slice = slice(pos + 1, pos + confirmation_window + 1) + next_closes = closes[next_slice] + next_highs = highs[next_slice] + next_lows = lows[next_slice] + previous_slice = slice(pos - confirmation_window, pos) + previous_closes = closes[previous_slice] + previous_highs = highs[previous_slice] + previous_lows = lows[previous_slice] + if direction == TrendDirection.DOWN: - return np.all(next_closes < highs[pos]) + return ( + np.all(next_closes < highs[pos]) + and np.all(previous_closes < highs[pos]) + and np.all(next_highs <= highs[pos]) + and np.all(previous_highs <= highs[pos]) + ) elif direction == TrendDirection.UP: - return np.all(next_closes > lows[pos]) + return ( + np.all(next_closes > lows[pos]) + and np.all(previous_closes > lows[pos]) + and np.all(next_lows >= lows[pos]) + and np.all(previous_lows >= lows[pos]) + ) return False start_pos = 0 @@ -899,27 +929,24 @@ def zigzag( initial_high = highs[initial_high_pos] initial_low = lows[initial_low_pos] for i in range(start_pos + 1, len(df)): - if highs[i] > initial_high: - initial_high, initial_high_pos = highs[i], i - if lows[i] < initial_low: - initial_low, initial_low_pos = lows[i], i - if ( - i - initial_high_pos < confirmation_window - or i - initial_low_pos < confirmation_window - ): - continue - - initial_move_from_high = (initial_high - lows[i]) / initial_high - initial_move_from_low = (highs[i] - initial_low) / initial_low - if initial_move_from_high >= thresholds[i] and is_reversal_confirmed( - initial_high_pos, TrendDirection.DOWN - ): + current_high = highs[i] + current_low = lows[i] + if current_high > initial_high: + initial_high, initial_high_pos = current_high, i + if current_low < initial_low: + initial_low, initial_low_pos = current_low, i + + initial_move_from_high = (initial_high - current_low) / initial_high + initial_move_from_low = (current_high - initial_low) / initial_low + if initial_move_from_high >= thresholds[ + initial_high_pos + ] and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN): add_pivot(initial_high_pos, initial_high, TrendDirection.UP) state = TrendDirection.DOWN break - elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed( - initial_low_pos, TrendDirection.UP - ): + elif initial_move_from_low >= thresholds[ + initial_low_pos + ] and is_reversal_confirmed(initial_low_pos, TrendDirection.UP): add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) state = TrendDirection.UP break @@ -929,26 +956,30 @@ def zigzag( for i in range(last_pivot_pos + 1, len(df)): current_high = highs[i] current_low = lows[i] - last_pivot_val = pivots_values[-1] + if state == TrendDirection.UP: - if current_high > last_pivot_val: - update_last_pivot(i, current_high, TrendDirection.UP) - elif ( - (last_pivot_val - current_low) / last_pivot_val >= thresholds[i] - and (i - last_pivot_pos) > depth - and is_reversal_confirmed(i, TrendDirection.DOWN) + if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value: + update_candidate_pivot(i, current_high, TrendDirection.UP) + if ( + (candidate_pivot_value - current_low) / candidate_pivot_value + >= thresholds[candidate_pivot_pos] + and (candidate_pivot_pos - last_pivot_pos) >= depth + and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.DOWN) ): - add_pivot(i, current_low, TrendDirection.DOWN) + add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) state = TrendDirection.DOWN elif state == TrendDirection.DOWN: - if current_low < last_pivot_val: - update_last_pivot(i, current_low, TrendDirection.DOWN) - elif ( - (current_high - last_pivot_val) / last_pivot_val >= thresholds[i] - and (i - last_pivot_pos) > depth - and is_reversal_confirmed(i, TrendDirection.UP) + if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value: + update_candidate_pivot(i, current_low, TrendDirection.DOWN) + if ( + (current_high - candidate_pivot_value) / candidate_pivot_value + >= thresholds[candidate_pivot_pos] + and (candidate_pivot_pos - last_pivot_pos) >= depth + and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.UP) ): - add_pivot(i, current_high, TrendDirection.UP) + add_pivot( + candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN + ) state = TrendDirection.UP return pivots_indices, pivots_values, pivots_directions diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index d2e8f88..07fadce 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.22" + return "3.3.23" timeframe = "5m" @@ -643,12 +643,15 @@ class QuickAdapterV3(IStrategy): last_candle_natr = last_candle["natr_label_period_candles"] if isna(last_candle_natr): return False - entry_price_fluctuation = ( - last_candle_close * last_candle_natr * self.get_entry_natr_ratio(pair) - ) + entry_natr_ratio = self.get_entry_natr_ratio(pair) if side == "long": - lower_bound = last_candle_low - entry_price_fluctuation - upper_bound = last_candle_close + entry_price_fluctuation + lower_bound = ( + last_candle_low - last_candle_low * last_candle_natr * entry_natr_ratio + ) + upper_bound = ( + last_candle_close + + last_candle_close * last_candle_natr * entry_natr_ratio + ) if lower_bound < 0: logger.info( f"User denied {side} entry for {pair}: calculated lower bound {lower_bound} is below zero" @@ -661,8 +664,14 @@ class QuickAdapterV3(IStrategy): f"User denied {side} entry for {pair}: rate {rate} outside bounds [{lower_bound}, {upper_bound}]" ) elif side == "short": - lower_bound = last_candle_close - entry_price_fluctuation - upper_bound = last_candle_high + entry_price_fluctuation + lower_bound = ( + last_candle_close + - last_candle_close * last_candle_natr * entry_natr_ratio + ) + upper_bound = ( + last_candle_high + + last_candle_high * last_candle_natr * entry_natr_ratio + ) if lower_bound < 0: logger.info( f"User denied {side} entry for {pair}: calculated lower bound {lower_bound} is below zero" diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 3111957..f0853f3 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -339,10 +339,10 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 1.0, - confirmation_window: int = 3, + confirmation_window: int = 2, depth: int = 12, ) -> tuple[list[int], list[float], list[int]]: - if df.empty or len(df) < natr_period + confirmation_window: + if df.empty or len(df) < max(natr_period, 2 * confirmation_window + 1): return [], [], [] indices = df.index.tolist() @@ -354,8 +354,19 @@ def zigzag( lows = df["low"].values state: TrendDirection = TrendDirection.NEUTRAL - pivots_indices, pivots_values, pivots_directions = [], [], [] + last_pivot_pos = -depth - 1 + pivots_indices, pivots_values, pivots_directions = [], [], [] + + candidate_pivot_pos = -1 + candidate_pivot_value = np.nan + candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL + + def update_candidate_pivot(pos: int, value: float, direction: TrendDirection): + nonlocal candidate_pivot_pos, candidate_pivot_value, candidate_pivot_direction + candidate_pivot_pos = pos + candidate_pivot_value = value + candidate_pivot_direction = direction def add_pivot(pos: int, value: float, direction: TrendDirection): nonlocal last_pivot_pos @@ -365,21 +376,40 @@ def zigzag( pivots_values.append(value) pivots_directions.append(direction) last_pivot_pos = pos - - def update_last_pivot(pos: int, value: float, direction: TrendDirection): - if pivots_indices and indices[pos] != pivots_indices[-1]: - pivots_indices[-1] = indices[pos] - pivots_values[-1] = value - pivots_directions[-1] = direction + update_candidate_pivot( + pos, + value, + TrendDirection.UP + if direction == TrendDirection.DOWN + else TrendDirection.DOWN, + ) def is_reversal_confirmed(pos: int, direction: TrendDirection) -> bool: - if pos + confirmation_window >= len(df): + if pos - confirmation_window < 0 or pos + confirmation_window >= len(df): return False - next_closes = closes[pos + 1 : pos + confirmation_window + 1] + next_slice = slice(pos + 1, pos + confirmation_window + 1) + next_closes = closes[next_slice] + next_highs = highs[next_slice] + next_lows = lows[next_slice] + previous_slice = slice(pos - confirmation_window, pos) + previous_closes = closes[previous_slice] + previous_highs = highs[previous_slice] + previous_lows = lows[previous_slice] + if direction == TrendDirection.DOWN: - return np.all(next_closes < highs[pos]) + return ( + np.all(next_closes < highs[pos]) + and np.all(previous_closes < highs[pos]) + and np.all(next_highs <= highs[pos]) + and np.all(previous_highs <= highs[pos]) + ) elif direction == TrendDirection.UP: - return np.all(next_closes > lows[pos]) + return ( + np.all(next_closes > lows[pos]) + and np.all(previous_closes > lows[pos]) + and np.all(next_lows >= lows[pos]) + and np.all(previous_lows >= lows[pos]) + ) return False start_pos = 0 @@ -388,27 +418,24 @@ def zigzag( initial_high = highs[initial_high_pos] initial_low = lows[initial_low_pos] for i in range(start_pos + 1, len(df)): - if highs[i] > initial_high: - initial_high, initial_high_pos = highs[i], i - if lows[i] < initial_low: - initial_low, initial_low_pos = lows[i], i - if ( - i - initial_high_pos < confirmation_window - or i - initial_low_pos < confirmation_window - ): - continue - - initial_move_from_high = (initial_high - lows[i]) / initial_high - initial_move_from_low = (highs[i] - initial_low) / initial_low - if initial_move_from_high >= thresholds[i] and is_reversal_confirmed( - initial_high_pos, TrendDirection.DOWN - ): + current_high = highs[i] + current_low = lows[i] + if current_high > initial_high: + initial_high, initial_high_pos = current_high, i + if current_low < initial_low: + initial_low, initial_low_pos = current_low, i + + initial_move_from_high = (initial_high - current_low) / initial_high + initial_move_from_low = (current_high - initial_low) / initial_low + if initial_move_from_high >= thresholds[ + initial_high_pos + ] and is_reversal_confirmed(initial_high_pos, TrendDirection.DOWN): add_pivot(initial_high_pos, initial_high, TrendDirection.UP) state = TrendDirection.DOWN break - elif initial_move_from_low >= thresholds[i] and is_reversal_confirmed( - initial_low_pos, TrendDirection.UP - ): + elif initial_move_from_low >= thresholds[ + initial_low_pos + ] and is_reversal_confirmed(initial_low_pos, TrendDirection.UP): add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) state = TrendDirection.UP break @@ -418,26 +445,30 @@ def zigzag( for i in range(last_pivot_pos + 1, len(df)): current_high = highs[i] current_low = lows[i] - last_pivot_val = pivots_values[-1] + if state == TrendDirection.UP: - if current_high > last_pivot_val: - update_last_pivot(i, current_high, TrendDirection.UP) - elif ( - (last_pivot_val - current_low) / last_pivot_val >= thresholds[i] - and (i - last_pivot_pos) > depth - and is_reversal_confirmed(i, TrendDirection.DOWN) + if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value: + update_candidate_pivot(i, current_high, TrendDirection.UP) + if ( + (candidate_pivot_value - current_low) / candidate_pivot_value + >= thresholds[candidate_pivot_pos] + and (candidate_pivot_pos - last_pivot_pos) >= depth + and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.DOWN) ): - add_pivot(i, current_low, TrendDirection.DOWN) + add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) state = TrendDirection.DOWN elif state == TrendDirection.DOWN: - if current_low < last_pivot_val: - update_last_pivot(i, current_low, TrendDirection.DOWN) - elif ( - (current_high - last_pivot_val) / last_pivot_val >= thresholds[i] - and (i - last_pivot_pos) > depth - and is_reversal_confirmed(i, TrendDirection.UP) + if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value: + update_candidate_pivot(i, current_low, TrendDirection.DOWN) + if ( + (current_high - candidate_pivot_value) / candidate_pivot_value + >= thresholds[candidate_pivot_pos] + and (candidate_pivot_pos - last_pivot_pos) >= depth + and is_reversal_confirmed(candidate_pivot_pos, TrendDirection.UP) ): - add_pivot(i, current_high, TrendDirection.UP) + add_pivot( + candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN + ) state = TrendDirection.UP return pivots_indices, pivots_values, pivots_directions -- 2.43.0