From 987eba7f425aff83bf0f67b2138bf9eae631e1aa Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 15 Jun 2025 16:26:57 +0200 Subject: [PATCH] fix(qav3): finally fix pivot labeling logic to not fail MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../freqaimodels/QuickAdapterRegressorV3.py | 234 +++++------------- .../user_data/strategies/QuickAdapterV3.py | 2 +- quickadapter/user_data/strategies/Utils.py | 234 +++++------------- 3 files changed, 123 insertions(+), 347 deletions(-) diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index c25d1fd..33db07b 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -49,7 +49,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.88" + version = "3.7.89" @cached_property def _optuna_config(self) -> dict: @@ -1270,14 +1270,9 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 6.0, - use_local_extrema_validation=False, - use_slope_validation=False, - use_significant_move_away_validation=False, ) -> tuple[list[int], list[float], list[int], list[float]]: - min_confirmation_window: int = 3 - max_confirmation_window: int = 5 n = len(df) - if df.empty or n < max(natr_period, 2 * max_confirmation_window + 1): + if df.empty or n < natr_period: return [], [], [], [] natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy() @@ -1289,20 +1284,15 @@ def zigzag( lows = df.get("low").to_numpy() state: TrendDirection = TrendDirection.NEUTRAL - depth = -1 - pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = ( - [], - [], - [], - [], - ) + pivots_indices: list[int] = [] + pivots_values: list[float] = [] + pivots_directions: list[int] = [] + pivots_scaled_natrs: list[float] = [] last_pivot_pos = -1 candidate_pivot_pos: int = -1 candidate_pivot_value: float = np.nan - candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL - candidate_pivot_scaled_natr: float = np.nan volatility_quantile_cache: dict[int, float] = {} @@ -1319,10 +1309,10 @@ def zigzag( return volatility_quantile_cache[pos] - def calculate_confirmation_window( + def calculate_slope_confirmation_window( pos: int, - min_window: int = min_confirmation_window, - max_window: int = max_confirmation_window, + min_window: int = 3, + max_window: int = 5, ) -> int: volatility_quantile = calculate_volatility_quantile(pos) if np.isnan(volatility_quantile): @@ -1334,57 +1324,19 @@ def zigzag( max_window, ).astype(int) - def calculate_depth( - pos: int, - min_depth: int = 6, - max_depth: int = 24, - ) -> int: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return int(round(median([min_depth, max_depth]))) - - return np.clip( - round(max_depth - (max_depth - min_depth) * volatility_quantile), - min_depth, - max_depth, - ).astype(int) - - def calculate_min_slope_strength( - pos: int, - min_strength: float = 0.5, - max_strength: float = 1.5, - ) -> float: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return median([min_strength, max_strength]) - - return min_strength + (max_strength - min_strength) * volatility_quantile - - def update_candidate_pivot(pos: int, value: float, direction: TrendDirection): - nonlocal \ - candidate_pivot_pos, \ - candidate_pivot_value, \ - candidate_pivot_direction, \ - candidate_pivot_scaled_natr + def update_candidate_pivot(pos: int, value: float): + nonlocal candidate_pivot_pos, candidate_pivot_value if 0 <= pos < n: candidate_pivot_pos = pos candidate_pivot_value = value - candidate_pivot_direction = direction - candidate_pivot_scaled_natr = thresholds[pos] def reset_candidate_pivot(): - nonlocal \ - candidate_pivot_pos, \ - candidate_pivot_value, \ - candidate_pivot_direction, \ - candidate_pivot_scaled_natr + nonlocal candidate_pivot_pos, candidate_pivot_value candidate_pivot_pos = -1 candidate_pivot_value = np.nan - candidate_pivot_direction = TrendDirection.NEUTRAL - candidate_pivot_scaled_natr = np.nan def add_pivot(pos: int, value: float, direction: TrendDirection): - nonlocal last_pivot_pos, depth + nonlocal last_pivot_pos if pivots_indices and indices[pos] == pivots_indices[-1]: return pivots_indices.append(indices[pos]) @@ -1392,112 +1344,52 @@ def zigzag( pivots_directions.append(direction) pivots_scaled_natrs.append(thresholds[pos]) last_pivot_pos = pos - depth = calculate_depth(pos) reset_candidate_pivot() - def is_reversal_confirmed( + def is_pivot_confirmed( candidate_pivot_pos: int, - confirmation_start_pos: int, direction: TrendDirection, - move_away_ratio: float = 0.25, + min_slope: float = np.finfo(float).eps, + enable_weighting: bool = False, + slopes_ok_threshold: float = 0.75, ) -> bool: - confirmation_window = calculate_confirmation_window(candidate_pivot_pos) - - next_start = candidate_pivot_pos + 1 - next_end = min(next_start + confirmation_window, n) - next_moving_start = confirmation_start_pos + 1 - next_moving_end = min(next_moving_start + confirmation_window, n) - previous_start = max(candidate_pivot_pos - confirmation_window, 0) - previous_end = candidate_pivot_pos - if ( - next_start >= next_end - or next_moving_start >= next_moving_end - or previous_start >= previous_end - ): - return False + slope_confirmation_window = calculate_slope_confirmation_window( + candidate_pivot_pos + ) - next_slice = slice(next_start, next_end) - next_highs = highs[next_slice] - next_lows = lows[next_slice] - next_moving_slice = slice(next_moving_start, next_moving_end) - next_moving_closes = closes[next_moving_slice] - next_moving_highs = highs[next_moving_slice] - next_moving_lows = lows[next_moving_slice] - previous_slice = slice(previous_start, previous_end) - previous_highs = highs[previous_slice] - previous_lows = lows[previous_slice] - - local_extrema_ok = True - if use_local_extrema_validation: - local_extrema_threshold = (confirmation_window - 1) / confirmation_window - - if direction == TrendDirection.DOWN: - valid_next = ( - np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs) - >= local_extrema_threshold - ) - valid_previous = ( - np.sum(previous_highs < highs[candidate_pivot_pos]) - / len(previous_highs) - >= local_extrema_threshold - ) - local_extrema_ok = valid_next and valid_previous - elif direction == TrendDirection.UP: - valid_next = ( - np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows) - >= local_extrema_threshold - ) - valid_previous = ( - np.sum(previous_lows > lows[candidate_pivot_pos]) - / len(previous_lows) - >= local_extrema_threshold - ) - local_extrema_ok = valid_next and valid_previous - - slope_ok = True - if use_slope_validation and len(next_moving_closes) >= 2: - log_next_moving_closes = np.log(next_moving_closes) - log_next_moving_closes_std = np.std(log_next_moving_closes) - if np.isclose(log_next_moving_closes_std, 0): - next_moving_slope_strength = 0 - else: - log_next_moving_closes_length = len(log_next_moving_closes) - weights = np.linspace(0.5, 1.5, log_next_moving_closes_length) - log_next_moving_slope = np.polyfit( - range(log_next_moving_closes_length), - log_next_moving_closes, + slopes_ok: list[bool] = [] + + for i in range(candidate_pivot_pos + 1, n): + next_start = i + next_end = min(next_start + slope_confirmation_window, n) + + next_closes = closes[next_start:next_end] + + if len(next_closes) >= 2: + log_next_closes = np.log(next_closes) + log_next_closes_length = len(log_next_closes) + polyfit_kwargs = {} + if enable_weighting: + polyfit_kwargs = { + "w": np.linspace(0.5, 1.5, log_next_closes_length) + } + log_next_slope = np.polyfit( + range(log_next_closes_length), + log_next_closes, 1, - w=weights, + **polyfit_kwargs, )[0] - next_moving_slope_strength = ( - log_next_moving_slope / log_next_moving_closes_std - ) - min_slope_strength = calculate_min_slope_strength(candidate_pivot_pos) - if direction == TrendDirection.DOWN: - slope_ok = next_moving_slope_strength < -min_slope_strength - elif direction == TrendDirection.UP: - slope_ok = next_moving_slope_strength > min_slope_strength - - if use_significant_move_away_validation: - significant_move_away_ok = False - if direction == TrendDirection.DOWN: - if np.any( - next_moving_lows - < highs[candidate_pivot_pos] - * (1 - thresholds[candidate_pivot_pos] * move_away_ratio) - ): - significant_move_away_ok = True - elif direction == TrendDirection.UP: - if np.any( - next_moving_highs - > lows[candidate_pivot_pos] - * (1 + thresholds[candidate_pivot_pos] * move_away_ratio) - ): - significant_move_away_ok = True - else: - significant_move_away_ok = True + if direction == TrendDirection.DOWN: + slopes_ok.append(log_next_slope < -min_slope) + elif direction == TrendDirection.UP: + slopes_ok.append(log_next_slope > min_slope) + else: + slopes_ok.append(False) - return local_extrema_ok and slope_ok and significant_move_away_ok + if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold: + return True + + return False start_pos = 0 initial_high_pos = start_pos @@ -1541,33 +1433,29 @@ def zigzag( else: return [], [], [], [] - if n - last_pivot_pos - 1 < depth: - return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs - for i in range(last_pivot_pos + 1, n): current_high = highs[i] current_low = lows[i] if state == TrendDirection.UP: if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value: - update_candidate_pivot(i, current_high, TrendDirection.UP) + update_candidate_pivot(i, current_high) if ( - (candidate_pivot_value - current_low) / candidate_pivot_value - >= thresholds[candidate_pivot_pos] - and (candidate_pivot_pos - last_pivot_pos) >= depth - and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.DOWN) - ): + candidate_pivot_value - current_low + ) / candidate_pivot_value >= thresholds[ + candidate_pivot_pos + ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN): add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) state = TrendDirection.DOWN + elif state == TrendDirection.DOWN: if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value: - update_candidate_pivot(i, current_low, TrendDirection.DOWN) + update_candidate_pivot(i, current_low) if ( - (current_high - candidate_pivot_value) / candidate_pivot_value - >= thresholds[candidate_pivot_pos] - and (candidate_pivot_pos - last_pivot_pos) >= depth - and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.UP) - ): + current_high - candidate_pivot_value + ) / candidate_pivot_value >= thresholds[ + candidate_pivot_pos + ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP): add_pivot( candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN ) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 2480a74..e7ebbdb 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -64,7 +64,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.92" + return "3.3.93" timeframe = "5m" diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 7607425..e9d144a 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -394,14 +394,9 @@ def zigzag( df: pd.DataFrame, natr_period: int = 14, natr_ratio: float = 6.0, - use_local_extrema_validation=False, - use_slope_validation=False, - use_significant_move_away_validation=False, ) -> tuple[list[int], list[float], list[int], list[float]]: - min_confirmation_window: int = 3 - max_confirmation_window: int = 5 n = len(df) - if df.empty or n < max(natr_period, 2 * max_confirmation_window + 1): + if df.empty or n < natr_period: return [], [], [], [] natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy() @@ -413,20 +408,15 @@ def zigzag( lows = df.get("low").to_numpy() state: TrendDirection = TrendDirection.NEUTRAL - depth = -1 - pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs = ( - [], - [], - [], - [], - ) + pivots_indices: list[int] = [] + pivots_values: list[float] = [] + pivots_directions: list[int] = [] + pivots_scaled_natrs: list[float] = [] last_pivot_pos = -1 candidate_pivot_pos: int = -1 candidate_pivot_value: float = np.nan - candidate_pivot_direction: TrendDirection = TrendDirection.NEUTRAL - candidate_pivot_scaled_natr: float = np.nan volatility_quantile_cache: dict[int, float] = {} @@ -443,10 +433,10 @@ def zigzag( return volatility_quantile_cache[pos] - def calculate_confirmation_window( + def calculate_slope_confirmation_window( pos: int, - min_window: int = min_confirmation_window, - max_window: int = max_confirmation_window, + min_window: int = 3, + max_window: int = 5, ) -> int: volatility_quantile = calculate_volatility_quantile(pos) if np.isnan(volatility_quantile): @@ -458,57 +448,19 @@ def zigzag( max_window, ).astype(int) - def calculate_depth( - pos: int, - min_depth: int = 6, - max_depth: int = 24, - ) -> int: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return int(round(median([min_depth, max_depth]))) - - return np.clip( - round(max_depth - (max_depth - min_depth) * volatility_quantile), - min_depth, - max_depth, - ).astype(int) - - def calculate_min_slope_strength( - pos: int, - min_strength: float = 0.5, - max_strength: float = 1.5, - ) -> float: - volatility_quantile = calculate_volatility_quantile(pos) - if np.isnan(volatility_quantile): - return median([min_strength, max_strength]) - - return min_strength + (max_strength - min_strength) * volatility_quantile - - def update_candidate_pivot(pos: int, value: float, direction: TrendDirection): - nonlocal \ - candidate_pivot_pos, \ - candidate_pivot_value, \ - candidate_pivot_direction, \ - candidate_pivot_scaled_natr + def update_candidate_pivot(pos: int, value: float): + nonlocal candidate_pivot_pos, candidate_pivot_value if 0 <= pos < n: candidate_pivot_pos = pos candidate_pivot_value = value - candidate_pivot_direction = direction - candidate_pivot_scaled_natr = thresholds[pos] def reset_candidate_pivot(): - nonlocal \ - candidate_pivot_pos, \ - candidate_pivot_value, \ - candidate_pivot_direction, \ - candidate_pivot_scaled_natr + nonlocal candidate_pivot_pos, candidate_pivot_value candidate_pivot_pos = -1 candidate_pivot_value = np.nan - candidate_pivot_direction = TrendDirection.NEUTRAL - candidate_pivot_scaled_natr = np.nan def add_pivot(pos: int, value: float, direction: TrendDirection): - nonlocal last_pivot_pos, depth + nonlocal last_pivot_pos if pivots_indices and indices[pos] == pivots_indices[-1]: return pivots_indices.append(indices[pos]) @@ -516,112 +468,52 @@ def zigzag( pivots_directions.append(direction) pivots_scaled_natrs.append(thresholds[pos]) last_pivot_pos = pos - depth = calculate_depth(pos) reset_candidate_pivot() - def is_reversal_confirmed( + def is_pivot_confirmed( candidate_pivot_pos: int, - confirmation_start_pos: int, direction: TrendDirection, - move_away_ratio: float = 0.25, + min_slope: float = np.finfo(float).eps, + enable_weighting: bool = False, + slopes_ok_threshold: float = 0.75, ) -> bool: - confirmation_window = calculate_confirmation_window(candidate_pivot_pos) - - next_start = candidate_pivot_pos + 1 - next_end = min(next_start + confirmation_window, n) - next_moving_start = confirmation_start_pos + 1 - next_moving_end = min(next_moving_start + confirmation_window, n) - previous_start = max(candidate_pivot_pos - confirmation_window, 0) - previous_end = candidate_pivot_pos - if ( - next_start >= next_end - or next_moving_start >= next_moving_end - or previous_start >= previous_end - ): - return False - - next_slice = slice(next_start, next_end) - next_highs = highs[next_slice] - next_lows = lows[next_slice] - next_moving_slice = slice(next_moving_start, next_moving_end) - next_moving_closes = closes[next_moving_slice] - next_moving_highs = highs[next_moving_slice] - next_moving_lows = lows[next_moving_slice] - previous_slice = slice(previous_start, previous_end) - previous_highs = highs[previous_slice] - previous_lows = lows[previous_slice] - - local_extrema_ok = True - if use_local_extrema_validation: - local_extrema_threshold = (confirmation_window - 1) / confirmation_window - - if direction == TrendDirection.DOWN: - valid_next = ( - np.sum(next_highs < highs[candidate_pivot_pos]) / len(next_highs) - >= local_extrema_threshold - ) - valid_previous = ( - np.sum(previous_highs < highs[candidate_pivot_pos]) - / len(previous_highs) - >= local_extrema_threshold - ) - local_extrema_ok = valid_next and valid_previous - elif direction == TrendDirection.UP: - valid_next = ( - np.sum(next_lows > lows[candidate_pivot_pos]) / len(next_lows) - >= local_extrema_threshold - ) - valid_previous = ( - np.sum(previous_lows > lows[candidate_pivot_pos]) - / len(previous_lows) - >= local_extrema_threshold - ) - local_extrema_ok = valid_next and valid_previous - - slope_ok = True - if use_slope_validation and len(next_moving_closes) >= 2: - log_next_moving_closes = np.log(next_moving_closes) - log_next_moving_closes_std = np.std(log_next_moving_closes) - if np.isclose(log_next_moving_closes_std, 0): - next_moving_slope_strength = 0 - else: - log_next_moving_closes_length = len(log_next_moving_closes) - weights = np.linspace(0.5, 1.5, log_next_moving_closes_length) - log_next_moving_slope = np.polyfit( - range(log_next_moving_closes_length), - log_next_moving_closes, + slope_confirmation_window = calculate_slope_confirmation_window( + candidate_pivot_pos + ) + + slopes_ok: list[bool] = [] + + for i in range(candidate_pivot_pos + 1, n): + next_start = i + next_end = min(next_start + slope_confirmation_window, n) + + next_closes = closes[next_start:next_end] + + if len(next_closes) >= 2: + log_next_closes = np.log(next_closes) + log_next_closes_length = len(log_next_closes) + polyfit_kwargs = {} + if enable_weighting: + polyfit_kwargs = { + "w": np.linspace(0.5, 1.5, log_next_closes_length) + } + log_next_slope = np.polyfit( + range(log_next_closes_length), + log_next_closes, 1, - w=weights, + **polyfit_kwargs, )[0] - next_moving_slope_strength = ( - log_next_moving_slope / log_next_moving_closes_std - ) - min_slope_strength = calculate_min_slope_strength(candidate_pivot_pos) - if direction == TrendDirection.DOWN: - slope_ok = next_moving_slope_strength < -min_slope_strength - elif direction == TrendDirection.UP: - slope_ok = next_moving_slope_strength > min_slope_strength - - if use_significant_move_away_validation: - significant_move_away_ok = False - if direction == TrendDirection.DOWN: - if np.any( - next_moving_lows - < highs[candidate_pivot_pos] - * (1 - thresholds[candidate_pivot_pos] * move_away_ratio) - ): - significant_move_away_ok = True - elif direction == TrendDirection.UP: - if np.any( - next_moving_highs - > lows[candidate_pivot_pos] - * (1 + thresholds[candidate_pivot_pos] * move_away_ratio) - ): - significant_move_away_ok = True - else: - significant_move_away_ok = True + if direction == TrendDirection.DOWN: + slopes_ok.append(log_next_slope < -min_slope) + elif direction == TrendDirection.UP: + slopes_ok.append(log_next_slope > min_slope) + else: + slopes_ok.append(False) - return local_extrema_ok and slope_ok and significant_move_away_ok + if sum(slopes_ok) / len(slopes_ok) >= slopes_ok_threshold: + return True + + return False start_pos = 0 initial_high_pos = start_pos @@ -665,33 +557,29 @@ def zigzag( else: return [], [], [], [] - if n - last_pivot_pos - 1 < depth: - return pivots_indices, pivots_values, pivots_directions, pivots_scaled_natrs - for i in range(last_pivot_pos + 1, n): current_high = highs[i] current_low = lows[i] if state == TrendDirection.UP: if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value: - update_candidate_pivot(i, current_high, TrendDirection.UP) + update_candidate_pivot(i, current_high) if ( - (candidate_pivot_value - current_low) / candidate_pivot_value - >= thresholds[candidate_pivot_pos] - and (candidate_pivot_pos - last_pivot_pos) >= depth - and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.DOWN) - ): + candidate_pivot_value - current_low + ) / candidate_pivot_value >= thresholds[ + candidate_pivot_pos + ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.DOWN): add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP) state = TrendDirection.DOWN + elif state == TrendDirection.DOWN: if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value: - update_candidate_pivot(i, current_low, TrendDirection.DOWN) + update_candidate_pivot(i, current_low) if ( - (current_high - candidate_pivot_value) / candidate_pivot_value - >= thresholds[candidate_pivot_pos] - and (candidate_pivot_pos - last_pivot_pos) >= depth - and is_reversal_confirmed(candidate_pivot_pos, i, TrendDirection.UP) - ): + current_high - candidate_pivot_value + ) / candidate_pivot_value >= thresholds[ + candidate_pivot_pos + ] and is_pivot_confirmed(candidate_pivot_pos, TrendDirection.UP): add_pivot( candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN ) -- 2.43.0