From: Jérôme Benoit Date: Thu, 1 May 2025 20:33:32 +0000 (+0200) Subject: feat(qav3): add fractals filtering to reversal pivots labeling X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=62431684ce4cbf676e08914cd52a5be0c2384a92;p=freqai-strategies.git feat(qav3): add fractals filtering to reversal pivots labeling Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 3c3bc52..5919ae3 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -45,7 +45,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.22" + version = "3.7.23" @cached_property def _optuna_config(self) -> dict: @@ -846,17 +846,48 @@ class TrendDirection(IntEnum): DOWN = -1 +def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]: + highs = df["high"].values + lows = df["low"].values + fractal_highs = [] + fractal_lows = [] + for i in range(fractal_period, max(fractal_period, len(df) - fractal_period)): + valid_high = True + valid_low = True + for j in range(1, fractal_period + 1): + if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]: + valid_high = False + if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]: + valid_low = False + if not valid_high and not valid_low: + break + if valid_high: + fractal_highs.append(i) + if valid_low: + fractal_lows.append(i) + return fractal_highs, fractal_lows + + def zigzag( df: pd.DataFrame, - period: int = 14, - ratio: float = 1.0, + natr_period: int = 14, + natr_ratio: float = 1.0, + fractal_period: int = 2, depth: int = 12, ) -> tuple[list[int], list[float], list[int]]: if df.empty or len(df) < 2: return [], [], [] + fractal_highs, fractal_lows = find_fractals(df, fractal_period) + fractal_high_set = set(fractal_highs) + fractal_low_set = set(fractal_lows) + is_fractal_high = [i in fractal_high_set for i in range(len(df))] + is_fractal_low = [i in fractal_low_set for i in range(len(df))] + indices = df.index.tolist() - thresholds = (ta.NATR(df, timeperiod=period) * ratio).fillna(method="bfill").values + thresholds = ( + (ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values + ) highs = df["high"].values lows = df["low"].values @@ -877,11 +908,12 @@ def zigzag( pivots_values[-1] = value pivots_directions[-1] = direction - initial_high_pos = 0 - initial_low_pos = 0 + start_pos = fractal_period + initial_high_pos = start_pos + initial_low_pos = start_pos initial_high = highs[initial_high_pos] initial_low = lows[initial_low_pos] - for i in range(1, len(df)): + for i in range(start_pos + 1, len(df)): if highs[i] > initial_high: initial_high, initial_high_pos = highs[i], i if lows[i] < initial_low: @@ -889,11 +921,14 @@ def zigzag( initial_move_from_high = (initial_high - lows[i]) / initial_high initial_move_from_low = (highs[i] - initial_low) / initial_low - if initial_move_from_high >= thresholds[i]: + if ( + initial_move_from_high >= thresholds[i] + and is_fractal_high[initial_high_pos] + ): add_pivot(initial_high_pos, initial_high, TrendDirection.UP) state = TrendDirection.DOWN break - elif initial_move_from_low >= thresholds[i]: + elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]: add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) state = TrendDirection.UP break @@ -903,40 +938,48 @@ def zigzag( for i in range(i + 1, len(df)): last_pivot_val = pivots_values[-1] if state == TrendDirection.UP: - if highs[i] > last_pivot_val: + if highs[i] > last_pivot_val and is_fractal_high[i]: update_last_pivot(i, highs[i], TrendDirection.UP) - elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and ( - i - last_pivot_pos - ) >= depth: + elif ( + (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] + and (i - last_pivot_pos) >= depth + and is_fractal_low[i] + ): add_pivot(i, lows[i], TrendDirection.DOWN) state = TrendDirection.DOWN elif state == TrendDirection.DOWN: - if lows[i] < last_pivot_val: + if lows[i] < last_pivot_val and is_fractal_low[i]: update_last_pivot(i, lows[i], TrendDirection.DOWN) - elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and ( - i - last_pivot_pos - ) >= depth: + elif ( + (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] + and (i - last_pivot_pos) >= depth + and is_fractal_high[i] + ): add_pivot(i, highs[i], TrendDirection.UP) state = TrendDirection.UP final_pos = len(df) - 1 - if state != TrendDirection.NEUTRAL and (final_pos - last_pivot_pos) >= depth: - last_pivot_val = pivots_values[-1] - price_move = ( - (highs[final_pos] - last_pivot_val) / last_pivot_val - if state == TrendDirection.UP - else (last_pivot_val - lows[final_pos]) / last_pivot_val + last_pivot_val = pivots_values[-1] + final_price_move = ( + (highs[final_pos] - last_pivot_val) / last_pivot_val + if state == TrendDirection.UP + else (last_pivot_val - lows[final_pos]) / last_pivot_val + ) + if ( + state != TrendDirection.NEUTRAL + and (final_pos - last_pivot_pos) >= depth + and final_price_move >= thresholds[final_pos] + and indices[final_pos] != pivots_indices[-1] + and ( + (state == TrendDirection.UP and is_fractal_high[final_pos]) + or (state == TrendDirection.DOWN and is_fractal_low[final_pos]) + ) + ): + add_pivot( + final_pos, + highs[final_pos] if state == TrendDirection.UP else lows[final_pos], + state, ) - - if ( - price_move >= thresholds[final_pos] - and indices[final_pos] != pivots_indices[-1] - ): - add_pivot( - final_pos, - highs[final_pos] if state == TrendDirection.UP else lows[final_pos], - state, - ) return pivots_indices, pivots_values, pivots_directions @@ -974,8 +1017,8 @@ def label_objective( _, pivots_values, _ = zigzag( df, - period=label_period_candles, - ratio=label_natr_ratio, + natr_period=label_period_candles, + natr_ratio=label_natr_ratio, ) if len(pivots_values) < 2: diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 07ddaae..9caab9d 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.16" + return "3.3.17" timeframe = "5m" @@ -378,8 +378,8 @@ class QuickAdapterV3(IStrategy): pair = str(metadata.get("pair")) pivots_indices, _, pivots_directions = zigzag( dataframe, - period=self.get_label_period_candles(pair), - ratio=self.get_label_natr_ratio(pair), + natr_period=self.get_label_period_candles(pair), + natr_ratio=self.get_label_natr_ratio(pair), ) dataframe[EXTREMA_COLUMN] = 0 for pivot_idx, pivot_dir in zip(pivots_indices, pivots_directions): diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 2e56ccc..d4fafe8 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -305,17 +305,48 @@ class TrendDirection(IntEnum): DOWN = -1 +def find_fractals(df: pd.DataFrame, fractal_period: int) -> tuple[list[int], list[int]]: + highs = df["high"].values + lows = df["low"].values + fractal_highs = [] + fractal_lows = [] + for i in range(fractal_period, max(fractal_period, len(df) - fractal_period)): + valid_high = True + valid_low = True + for j in range(1, fractal_period + 1): + if highs[i] <= highs[i - j] or highs[i] <= highs[i + j]: + valid_high = False + if lows[i] >= lows[i - j] or lows[i] >= lows[i + j]: + valid_low = False + if not valid_high and not valid_low: + break + if valid_high: + fractal_highs.append(i) + if valid_low: + fractal_lows.append(i) + return fractal_highs, fractal_lows + + def zigzag( df: pd.DataFrame, - period: int = 14, - ratio: float = 1.0, + natr_period: int = 14, + natr_ratio: float = 1.0, + fractal_period: int = 2, depth: int = 12, ) -> tuple[list[int], list[float], list[int]]: if df.empty or len(df) < 2: return [], [], [] + fractal_highs, fractal_lows = find_fractals(df, fractal_period) + fractal_high_set = set(fractal_highs) + fractal_low_set = set(fractal_lows) + is_fractal_high = [i in fractal_high_set for i in range(len(df))] + is_fractal_low = [i in fractal_low_set for i in range(len(df))] + indices = df.index.tolist() - thresholds = (ta.NATR(df, timeperiod=period) * ratio).fillna(method="bfill").values + thresholds = ( + (ta.NATR(df, timeperiod=natr_period) * natr_ratio).fillna(method="bfill").values + ) highs = df["high"].values lows = df["low"].values @@ -336,11 +367,12 @@ def zigzag( pivots_values[-1] = value pivots_directions[-1] = direction - initial_high_pos = 0 - initial_low_pos = 0 + start_pos = fractal_period + initial_high_pos = start_pos + initial_low_pos = start_pos initial_high = highs[initial_high_pos] initial_low = lows[initial_low_pos] - for i in range(1, len(df)): + for i in range(start_pos + 1, len(df)): if highs[i] > initial_high: initial_high, initial_high_pos = highs[i], i if lows[i] < initial_low: @@ -348,11 +380,14 @@ def zigzag( initial_move_from_high = (initial_high - lows[i]) / initial_high initial_move_from_low = (highs[i] - initial_low) / initial_low - if initial_move_from_high >= thresholds[i]: + if ( + initial_move_from_high >= thresholds[i] + and is_fractal_high[initial_high_pos] + ): add_pivot(initial_high_pos, initial_high, TrendDirection.UP) state = TrendDirection.DOWN break - elif initial_move_from_low >= thresholds[i]: + elif initial_move_from_low >= thresholds[i] and is_fractal_low[initial_low_pos]: add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN) state = TrendDirection.UP break @@ -362,39 +397,47 @@ def zigzag( for i in range(i + 1, len(df)): last_pivot_val = pivots_values[-1] if state == TrendDirection.UP: - if highs[i] > last_pivot_val: + if highs[i] > last_pivot_val and is_fractal_high[i]: update_last_pivot(i, highs[i], TrendDirection.UP) - elif (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] and ( - i - last_pivot_pos - ) >= depth: + elif ( + (last_pivot_val - lows[i]) / last_pivot_val >= thresholds[i] + and (i - last_pivot_pos) >= depth + and is_fractal_low[i] + ): add_pivot(i, lows[i], TrendDirection.DOWN) state = TrendDirection.DOWN elif state == TrendDirection.DOWN: - if lows[i] < last_pivot_val: + if lows[i] < last_pivot_val and is_fractal_low[i]: update_last_pivot(i, lows[i], TrendDirection.DOWN) - elif (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] and ( - i - last_pivot_pos - ) >= depth: + elif ( + (highs[i] - last_pivot_val) / last_pivot_val >= thresholds[i] + and (i - last_pivot_pos) >= depth + and is_fractal_high[i] + ): add_pivot(i, highs[i], TrendDirection.UP) state = TrendDirection.UP final_pos = len(df) - 1 - if state != TrendDirection.NEUTRAL and (final_pos - last_pivot_pos) >= depth: - last_pivot_val = pivots_values[-1] - price_move = ( - (highs[final_pos] - last_pivot_val) / last_pivot_val - if state == TrendDirection.UP - else (last_pivot_val - lows[final_pos]) / last_pivot_val + last_pivot_val = pivots_values[-1] + final_price_move = ( + (highs[final_pos] - last_pivot_val) / last_pivot_val + if state == TrendDirection.UP + else (last_pivot_val - lows[final_pos]) / last_pivot_val + ) + if ( + state != TrendDirection.NEUTRAL + and (final_pos - last_pivot_pos) >= depth + and final_price_move >= thresholds[final_pos] + and indices[final_pos] != pivots_indices[-1] + and ( + (state == TrendDirection.UP and is_fractal_high[final_pos]) + or (state == TrendDirection.DOWN and is_fractal_low[final_pos]) + ) + ): + add_pivot( + final_pos, + highs[final_pos] if state == TrendDirection.UP else lows[final_pos], + state, ) - - if ( - price_move >= thresholds[final_pos] - and indices[final_pos] != pivots_indices[-1] - ): - add_pivot( - final_pos, - highs[final_pos] if state == TrendDirection.UP else lows[final_pos], - state, - ) return pivots_indices, pivots_values, pivots_directions