From: Jérôme Benoit Date: Sat, 26 Apr 2025 19:43:23 +0000 (+0200) Subject: perf(qav3): add depth to zigzag algo to filter labeling noise X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=22945bb809b9a32abbfae76111c6f97f299da4b2;p=freqai-strategies.git perf(qav3): add depth to zigzag algo to filter labeling noise Signed-off-by: Jérôme Benoit --- diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index d1b1a44..11ca199 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -44,7 +44,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.7.17" + version = "3.7.18" @cached_property def _optuna_config(self) -> dict: @@ -382,7 +382,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): label_period_candles: int, ) -> tuple[float, float]: temperature = float( - self.freqai_info.get("prediction_thresholds_temperature", 175.0) + self.freqai_info.get("prediction_thresholds_temperature", 225.0) ) extrema = pred_df[EXTREMA_COLUMN].iloc[ -( @@ -784,25 +784,25 @@ def get_optuna_study_model_parameters( ) -> dict: study_model_parameters = { "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), - "min_child_weight": trial.suggest_int("min_child_weight", 1, 200), + "min_child_weight": trial.suggest_int("min_child_weight", 1, 100), "subsample": trial.suggest_float("subsample", 0.5, 1.0), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0), - "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True), - "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True), + "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 100.0, log=True), + "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 100.0, log=True), } if regressor == "xgboost": study_model_parameters.update( { "max_depth": trial.suggest_int("max_depth", 3, 15), - "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True), + "gamma": trial.suggest_float("gamma", 1e-8, 10.0, log=True), } ) elif regressor == "lightgbm": study_model_parameters.update( { - "num_leaves": trial.suggest_int("num_leaves", 2, 256), + "num_leaves": trial.suggest_int("num_leaves", 8, 256), "min_split_gain": trial.suggest_float( - "min_split_gain", 1e-8, 1.0, log=True + "min_split_gain", 1e-8, 10.0, log=True ), "min_child_samples": trial.suggest_int("min_child_samples", 10, 100), } @@ -843,110 +843,96 @@ def hp_objective( return error -def dynamic_zigzag( +def zigzag( df: pd.DataFrame, period: int = 14, - natr: bool = True, ratio: float = 1.0, + depth: int = 7, ) -> tuple[list[int], list[float], list[int]]: - """ - Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR. - - Parameters: - df (pd.DataFrame): OHLCV DataFrame. - period (int): Period for ATR/NATR calculation (default: 14). - natr (bool): Use NATR (True) or ATR (False) (default: True). - ratio (float): ratio for dynamic threshold (default: 1.0). - - Returns: - tuple: Lists of indices, extrema, and directions. - """ - if df.empty: + if df.empty or len(df) < 2: return [], [], [] - if natr: - thresholds = ta.NATR(df, timeperiod=period) - else: - thresholds = ta.ATR(df, timeperiod=period) - thresholds = thresholds.ffill().bfill() * ratio - - indices = [] - extrema = [] - directions = [] - - first_high = df["high"].iloc[0] - first_low = df["low"].iloc[0] - first_threshold = thresholds.iloc[0] - - if natr: - first_move = (first_high - first_low) / first_low - else: - first_move = first_high - first_low - if first_move >= first_threshold: - current_dir = 1 - current_extreme = first_high - else: - current_dir = -1 - current_extreme = first_low - current_extreme_idx = df.index[0] - - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - + indices = df.index.tolist() + thresholds = ( + (ta.NATR(df, timeperiod=period).shift(1) * ratio).fillna(method="bfill").values + ) + highs = df["high"].values + lows = df["low"].values + + pivots_indices, pivots_values, pivots_directions = [], [], [] + state = 0 # 0=neutral, 1=up, -1=down + last_pivot_pos = -depth + + def add_pivot(pos: int, value: float, direction: int): + nonlocal last_pivot_pos + pivots_indices.append(indices[pos]) + pivots_values.append(value) + pivots_directions.append(direction) + last_pivot_pos = pos + + def update_last_pivot(pos: int, value: float, direction: int): + if pivots_indices: + pivots_indices[-1] = indices[pos] + pivots_values[-1] = value + pivots_directions[-1] = direction + + initial_high = highs[0] + initial_low = lows[0] + initial_high_pos = 0 + initial_low_pos = 0 for i in range(1, len(df)): - current_idx = df.index[i] - h = df.at[current_idx, "high"] - l = df.at[current_idx, "low"] - threshold = thresholds.iloc[i] - - if current_dir == 1: # Looking for higher high - if h > current_extreme: - current_extreme = h - current_extreme_idx = current_idx - continue - if natr: - reversal = (current_extreme - l) / current_extreme >= threshold - else: - reversal = (current_extreme - l) >= threshold - if reversal: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = -1 - current_extreme = l - current_extreme_idx = current_idx - - elif current_dir == -1: # Looking for lower low - if l < current_extreme: - current_extreme = l - current_extreme_idx = current_idx - continue - if natr: - reversal = (h - current_extreme) / current_extreme >= threshold - else: - reversal = (h - current_extreme) >= threshold - if reversal: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx + if highs[i] > initial_high: + initial_high, initial_high_pos = highs[i], i + if lows[i] < initial_low: + initial_low, initial_low_pos = lows[i], i + + if (highs[i] - initial_low) / initial_low > thresholds[i]: + add_pivot(initial_low_pos, initial_low, -1) + state = 1 + break + elif (initial_high - lows[i]) / initial_high > thresholds[i]: + add_pivot(initial_high_pos, initial_high, 1) + state = -1 + break + else: + return [], [], [] - current_dir = 1 - current_extreme = h - current_extreme_idx = current_idx + for i in range(i + 1, len(df)): + if state == 1: + if highs[i] > pivots_values[-1]: + update_last_pivot(i, highs[i], 1) + elif (pivots_values[-1] - lows[i]) / pivots_values[-1] >= thresholds[ + i + ] and (i - last_pivot_pos) >= depth: + add_pivot(i, lows[i], -1) + state = -1 + elif state == -1: + if lows[i] < pivots_values[-1]: + update_last_pivot(i, lows[i], -1) + elif (highs[i] - pivots_values[-1]) / pivots_values[-1] >= thresholds[ + i + ] and (i - last_pivot_pos) >= depth: + add_pivot(i, highs[i], 1) + state = 1 + + if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth: + final_pos = len(df) - 1 + last_pivot_val = pivots_values[-1] + price_move = ( + (highs[final_pos] - last_pivot_val) / last_pivot_val + if state == 1 + else (last_pivot_val - lows[final_pos]) / last_pivot_val + ) - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) + if ( + price_move >= thresholds[final_pos] + and indices[final_pos] != pivots_indices[-1] + ): + add_pivot( + final_pos, highs[final_pos] if state == 1 else lows[final_pos], state + ) - return indices[1:], extrema[1:], directions[1:] + return pivots_indices, pivots_values, pivots_directions def label_objective( @@ -968,7 +954,7 @@ def label_objective( max_label_period_candles, step=candles_step, ) - label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.0675, 0.175) + label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.07, 0.3) df = df.iloc[ -( @@ -980,20 +966,20 @@ def label_objective( if df.empty: return -float("inf"), -float("inf") - _, peak_values, _ = dynamic_zigzag( + _, pivot_values, _ = zigzag( df, period=label_period_candles, ratio=label_natr_ratio, ) - if len(peak_values) < 2: + if len(pivot_values) < 2: return -float("inf"), -float("inf") scaled_natr_label_period_candles = ( ta.NATR(df, timeperiod=label_period_candles) * label_natr_ratio ) - return scaled_natr_label_period_candles.median(), len(peak_values) + return scaled_natr_label_period_candles.median(), len(pivot_values) def smoothed_max(series: pd.Series, temperature=1.0) -> float: diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 5872dc5..24a06aa 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -18,7 +18,7 @@ import pandas_ta as pta from Utils import ( alligator, bottom_change_percent, - dynamic_zigzag, + zigzag, ewo, non_zero_range, price_retracement_percent, @@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 def version(self) -> str: - return "3.3.11" + return "3.3.12" timeframe = "5m" @@ -384,14 +384,14 @@ class QuickAdapterV3(IStrategy): def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs): pair = str(metadata.get("pair")) - peak_indices, _, peak_directions = dynamic_zigzag( + pivot_indices, _, pivot_directions = zigzag( dataframe, period=self.get_label_period_candles(pair), ratio=self.get_label_natr_ratio(pair), ) dataframe[EXTREMA_COLUMN] = 0 - for peak_idx, peak_dir in zip(peak_indices, peak_directions): - dataframe.at[peak_idx, EXTREMA_COLUMN] = peak_dir + for pivot_idx, pivot_dir in zip(pivot_indices, pivot_directions): + dataframe.at[pivot_idx, EXTREMA_COLUMN] = pivot_dir dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0) dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0) dataframe[EXTREMA_COLUMN] = self.smooth_extrema( diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index e6a248f..4759073 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -299,187 +299,92 @@ def alligator( def zigzag( - df: pd.DataFrame, threshold: float = 0.05 -) -> tuple[list[int], list[float], list[int]]: - """ - Calculate the ZigZag indicator for a OHLCV DataFrame. - - Parameters: - df (pd.DataFrame): OHLCV DataFrame. - threshold (float): Percentage threshold for reversal (default 0.05 for 5%). - - Returns: - tuple: Lists of indices, extrema, and directions. - """ - if df.empty: - return [], [], [] - - indices = [] - extrema = [] - directions = [] - - first_high = df["high"].iloc[0] - first_low = df["low"].iloc[0] - - if (first_high - first_low) / first_low >= threshold: - current_dir = 1 - current_extreme = first_high - else: - current_dir = -1 - current_extreme = first_low - current_extreme_idx = df.index[0] - - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - for i in range(1, len(df)): - current_idx = df.index[i] - h = df.at[current_idx, "high"] - l = df.at[current_idx, "low"] - - if current_dir == 1: # Looking for higher high - if h > current_extreme: - current_extreme = h - current_extreme_idx = current_idx - continue - if (current_extreme - l) / current_extreme >= threshold: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = -1 - current_extreme = l - current_extreme_idx = current_idx - - elif current_dir == -1: # Looking for lower low - if l < current_extreme: - current_extreme = l - current_extreme_idx = current_idx - continue - if (h - current_extreme) / current_extreme >= threshold: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = 1 - current_extreme = h - current_extreme_idx = current_idx - - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - - return indices[1:], extrema[1:], directions[1:] - - -def dynamic_zigzag( df: pd.DataFrame, period: int = 14, - natr: bool = True, ratio: float = 1.0, + depth: int = 7, ) -> tuple[list[int], list[float], list[int]]: - """ - Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR. - - Parameters: - df (pd.DataFrame): OHLCV DataFrame. - period (int): Period for ATR/NATR calculation (default: 14). - natr (bool): Use NATR (True) or ATR (False) (default: True). - ratio (float): ratio for dynamic threshold (default: 1.0). - - Returns: - tuple: Lists of indices, extrema, and directions. - """ - if df.empty: + if df.empty or len(df) < 2: return [], [], [] - if natr: - thresholds = ta.NATR(df, timeperiod=period) + indices = df.index.tolist() + thresholds = ( + (ta.NATR(df, timeperiod=period).shift(1) * ratio).fillna(method="bfill").values + ) + highs = df["high"].values + lows = df["low"].values + + pivots_indices, pivots_values, pivots_directions = [], [], [] + state = 0 # 0=neutral, 1=up, -1=down + last_pivot_pos = -depth + + def add_pivot(pos: int, value: float, direction: int): + nonlocal last_pivot_pos + pivots_indices.append(indices[pos]) + pivots_values.append(value) + pivots_directions.append(direction) + last_pivot_pos = pos + + def update_last_pivot(pos: int, value: float, direction: int): + if pivots_indices: + pivots_indices[-1] = indices[pos] + pivots_values[-1] = value + pivots_directions[-1] = direction + + initial_high = highs[0] + initial_low = lows[0] + initial_high_pos = 0 + initial_low_pos = 0 + for i in range(1, len(df)): + if highs[i] > initial_high: + initial_high, initial_high_pos = highs[i], i + if lows[i] < initial_low: + initial_low, initial_low_pos = lows[i], i + + if (highs[i] - initial_low) / initial_low > thresholds[i]: + add_pivot(initial_low_pos, initial_low, -1) + state = 1 + break + elif (initial_high - lows[i]) / initial_high > thresholds[i]: + add_pivot(initial_high_pos, initial_high, 1) + state = -1 + break else: - thresholds = ta.ATR(df, timeperiod=period) - thresholds = thresholds.ffill().bfill() * ratio - - indices = [] - extrema = [] - directions = [] - - first_high = df["high"].iloc[0] - first_low = df["low"].iloc[0] - first_threshold = thresholds.iloc[0] + return [], [], [] - if natr: - first_move = (first_high - first_low) / first_low - else: - first_move = first_high - first_low - if first_move >= first_threshold: - current_dir = 1 - current_extreme = first_high - else: - current_dir = -1 - current_extreme = first_low - current_extreme_idx = df.index[0] + for i in range(i + 1, len(df)): + if state == 1: + if highs[i] > pivots_values[-1]: + update_last_pivot(i, highs[i], 1) + elif (pivots_values[-1] - lows[i]) / pivots_values[-1] >= thresholds[ + i + ] and (i - last_pivot_pos) >= depth: + add_pivot(i, lows[i], -1) + state = -1 + elif state == -1: + if lows[i] < pivots_values[-1]: + update_last_pivot(i, lows[i], -1) + elif (highs[i] - pivots_values[-1]) / pivots_values[-1] >= thresholds[ + i + ] and (i - last_pivot_pos) >= depth: + add_pivot(i, highs[i], 1) + state = 1 + + if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth: + final_pos = len(df) - 1 + last_pivot_val = pivots_values[-1] + price_move = ( + (highs[final_pos] - last_pivot_val) / last_pivot_val + if state == 1 + else (last_pivot_val - lows[final_pos]) / last_pivot_val + ) - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx + if ( + price_move >= thresholds[final_pos] + and indices[final_pos] != pivots_indices[-1] + ): + add_pivot( + final_pos, highs[final_pos] if state == 1 else lows[final_pos], state + ) - for i in range(1, len(df)): - current_idx = df.index[i] - h = df.at[current_idx, "high"] - l = df.at[current_idx, "low"] - threshold = thresholds.iloc[i] - - if current_dir == 1: # Looking for higher high - if h > current_extreme: - current_extreme = h - current_extreme_idx = current_idx - continue - if natr: - reversal = (current_extreme - l) / current_extreme >= threshold - else: - reversal = (current_extreme - l) >= threshold - if reversal: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = -1 - current_extreme = l - current_extreme_idx = current_idx - - elif current_dir == -1: # Looking for lower low - if l < current_extreme: - current_extreme = l - current_extreme_idx = current_idx - continue - if natr: - reversal = (h - current_extreme) / current_extreme >= threshold - else: - reversal = (h - current_extreme) >= threshold - if reversal: - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - last_idx = current_extreme_idx - - current_dir = 1 - current_extreme = h - current_extreme_idx = current_idx - - if current_extreme_idx != last_idx: - indices.append(current_extreme_idx) - extrema.append(current_extreme) - directions.append(current_dir) - - return indices[1:], extrema[1:], directions[1:] + return pivots_indices, pivots_values, pivots_directions