]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(qav3): add depth to zigzag algo to filter labeling noise
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 26 Apr 2025 19:43:23 +0000 (21:43 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 26 Apr 2025 19:43:23 +0000 (21:43 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index d1b1a44505baed72d75fba5e34a364a770f9043d..11ca199489b2b21a0838c6785e5a1ad1f930172c 100644 (file)
@@ -44,7 +44,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.7.17"
+    version = "3.7.18"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -382,7 +382,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         label_period_candles: int,
     ) -> tuple[float, float]:
         temperature = float(
-            self.freqai_info.get("prediction_thresholds_temperature", 175.0)
+            self.freqai_info.get("prediction_thresholds_temperature", 225.0)
         )
         extrema = pred_df[EXTREMA_COLUMN].iloc[
             -(
@@ -784,25 +784,25 @@ def get_optuna_study_model_parameters(
 ) -> dict:
     study_model_parameters = {
         "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
-        "min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
+        "min_child_weight": trial.suggest_int("min_child_weight", 1, 100),
         "subsample": trial.suggest_float("subsample", 0.5, 1.0),
         "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
-        "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
-        "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
+        "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 100.0, log=True),
+        "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 100.0, log=True),
     }
     if regressor == "xgboost":
         study_model_parameters.update(
             {
                 "max_depth": trial.suggest_int("max_depth", 3, 15),
-                "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
+                "gamma": trial.suggest_float("gamma", 1e-8, 10.0, log=True),
             }
         )
     elif regressor == "lightgbm":
         study_model_parameters.update(
             {
-                "num_leaves": trial.suggest_int("num_leaves", 2, 256),
+                "num_leaves": trial.suggest_int("num_leaves", 8, 256),
                 "min_split_gain": trial.suggest_float(
-                    "min_split_gain", 1e-8, 1.0, log=True
+                    "min_split_gain", 1e-8, 10.0, log=True
                 ),
                 "min_child_samples": trial.suggest_int("min_child_samples", 10, 100),
             }
@@ -843,110 +843,96 @@ def hp_objective(
     return error
 
 
-def dynamic_zigzag(
+def zigzag(
     df: pd.DataFrame,
     period: int = 14,
-    natr: bool = True,
     ratio: float = 1.0,
+    depth: int = 7,
 ) -> tuple[list[int], list[float], list[int]]:
-    """
-    Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
-
-    Parameters:
-    df (pd.DataFrame): OHLCV DataFrame.
-    period (int): Period for ATR/NATR calculation (default: 14).
-    natr (bool): Use NATR (True) or ATR (False) (default: True).
-    ratio (float): ratio for dynamic threshold (default: 1.0).
-
-    Returns:
-    tuple: Lists of indices, extrema, and directions.
-    """
-    if df.empty:
+    if df.empty or len(df) < 2:
         return [], [], []
 
-    if natr:
-        thresholds = ta.NATR(df, timeperiod=period)
-    else:
-        thresholds = ta.ATR(df, timeperiod=period)
-    thresholds = thresholds.ffill().bfill() * ratio
-
-    indices = []
-    extrema = []
-    directions = []
-
-    first_high = df["high"].iloc[0]
-    first_low = df["low"].iloc[0]
-    first_threshold = thresholds.iloc[0]
-
-    if natr:
-        first_move = (first_high - first_low) / first_low
-    else:
-        first_move = first_high - first_low
-    if first_move >= first_threshold:
-        current_dir = 1
-        current_extreme = first_high
-    else:
-        current_dir = -1
-        current_extreme = first_low
-    current_extreme_idx = df.index[0]
-
-    indices.append(current_extreme_idx)
-    extrema.append(current_extreme)
-    directions.append(current_dir)
-    last_idx = current_extreme_idx
-
+    indices = df.index.tolist()
+    thresholds = (
+        (ta.NATR(df, timeperiod=period).shift(1) * ratio).fillna(method="bfill").values
+    )
+    highs = df["high"].values
+    lows = df["low"].values
+
+    pivots_indices, pivots_values, pivots_directions = [], [], []
+    state = 0  # 0=neutral, 1=up, -1=down
+    last_pivot_pos = -depth
+
+    def add_pivot(pos: int, value: float, direction: int):
+        nonlocal last_pivot_pos
+        pivots_indices.append(indices[pos])
+        pivots_values.append(value)
+        pivots_directions.append(direction)
+        last_pivot_pos = pos
+
+    def update_last_pivot(pos: int, value: float, direction: int):
+        if pivots_indices:
+            pivots_indices[-1] = indices[pos]
+            pivots_values[-1] = value
+            pivots_directions[-1] = direction
+
+    initial_high = highs[0]
+    initial_low = lows[0]
+    initial_high_pos = 0
+    initial_low_pos = 0
     for i in range(1, len(df)):
-        current_idx = df.index[i]
-        h = df.at[current_idx, "high"]
-        l = df.at[current_idx, "low"]
-        threshold = thresholds.iloc[i]
-
-        if current_dir == 1:  # Looking for higher high
-            if h > current_extreme:
-                current_extreme = h
-                current_extreme_idx = current_idx
-                continue
-            if natr:
-                reversal = (current_extreme - l) / current_extreme >= threshold
-            else:
-                reversal = (current_extreme - l) >= threshold
-            if reversal:
-                if current_extreme_idx != last_idx:
-                    indices.append(current_extreme_idx)
-                    extrema.append(current_extreme)
-                    directions.append(current_dir)
-                    last_idx = current_extreme_idx
-
-                current_dir = -1
-                current_extreme = l
-                current_extreme_idx = current_idx
-
-        elif current_dir == -1:  # Looking for lower low
-            if l < current_extreme:
-                current_extreme = l
-                current_extreme_idx = current_idx
-                continue
-            if natr:
-                reversal = (h - current_extreme) / current_extreme >= threshold
-            else:
-                reversal = (h - current_extreme) >= threshold
-            if reversal:
-                if current_extreme_idx != last_idx:
-                    indices.append(current_extreme_idx)
-                    extrema.append(current_extreme)
-                    directions.append(current_dir)
-                    last_idx = current_extreme_idx
+        if highs[i] > initial_high:
+            initial_high, initial_high_pos = highs[i], i
+        if lows[i] < initial_low:
+            initial_low, initial_low_pos = lows[i], i
+
+        if (highs[i] - initial_low) / initial_low > thresholds[i]:
+            add_pivot(initial_low_pos, initial_low, -1)
+            state = 1
+            break
+        elif (initial_high - lows[i]) / initial_high > thresholds[i]:
+            add_pivot(initial_high_pos, initial_high, 1)
+            state = -1
+            break
+    else:
+        return [], [], []
 
-                current_dir = 1
-                current_extreme = h
-                current_extreme_idx = current_idx
+    for i in range(i + 1, len(df)):
+        if state == 1:
+            if highs[i] > pivots_values[-1]:
+                update_last_pivot(i, highs[i], 1)
+            elif (pivots_values[-1] - lows[i]) / pivots_values[-1] >= thresholds[
+                i
+            ] and (i - last_pivot_pos) >= depth:
+                add_pivot(i, lows[i], -1)
+                state = -1
+        elif state == -1:
+            if lows[i] < pivots_values[-1]:
+                update_last_pivot(i, lows[i], -1)
+            elif (highs[i] - pivots_values[-1]) / pivots_values[-1] >= thresholds[
+                i
+            ] and (i - last_pivot_pos) >= depth:
+                add_pivot(i, highs[i], 1)
+                state = 1
+
+    if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth:
+        final_pos = len(df) - 1
+        last_pivot_val = pivots_values[-1]
+        price_move = (
+            (highs[final_pos] - last_pivot_val) / last_pivot_val
+            if state == 1
+            else (last_pivot_val - lows[final_pos]) / last_pivot_val
+        )
 
-    if current_extreme_idx != last_idx:
-        indices.append(current_extreme_idx)
-        extrema.append(current_extreme)
-        directions.append(current_dir)
+        if (
+            price_move >= thresholds[final_pos]
+            and indices[final_pos] != pivots_indices[-1]
+        ):
+            add_pivot(
+                final_pos, highs[final_pos] if state == 1 else lows[final_pos], state
+            )
 
-    return indices[1:], extrema[1:], directions[1:]
+    return pivots_indices, pivots_values, pivots_directions
 
 
 def label_objective(
@@ -968,7 +954,7 @@ def label_objective(
         max_label_period_candles,
         step=candles_step,
     )
-    label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.0675, 0.175)
+    label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.07, 0.3)
 
     df = df.iloc[
         -(
@@ -980,20 +966,20 @@ def label_objective(
     if df.empty:
         return -float("inf"), -float("inf")
 
-    _, peak_values, _ = dynamic_zigzag(
+    _, pivot_values, _ = zigzag(
         df,
         period=label_period_candles,
         ratio=label_natr_ratio,
     )
 
-    if len(peak_values) < 2:
+    if len(pivot_values) < 2:
         return -float("inf"), -float("inf")
 
     scaled_natr_label_period_candles = (
         ta.NATR(df, timeperiod=label_period_candles) * label_natr_ratio
     )
 
-    return scaled_natr_label_period_candles.median(), len(peak_values)
+    return scaled_natr_label_period_candles.median(), len(pivot_values)
 
 
 def smoothed_max(series: pd.Series, temperature=1.0) -> float:
index 5872dc54a2e83747dee3d4d0c966a1c3d7df351f..24a06aab82f1ef6e98b6d5381bf9de5fb3d9f4ac 100644 (file)
@@ -18,7 +18,7 @@ import pandas_ta as pta
 from Utils import (
     alligator,
     bottom_change_percent,
-    dynamic_zigzag,
+    zigzag,
     ewo,
     non_zero_range,
     price_retracement_percent,
@@ -58,7 +58,7 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.3.11"
+        return "3.3.12"
 
     timeframe = "5m"
 
@@ -384,14 +384,14 @@ class QuickAdapterV3(IStrategy):
 
     def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs):
         pair = str(metadata.get("pair"))
-        peak_indices, _, peak_directions = dynamic_zigzag(
+        pivot_indices, _, pivot_directions = zigzag(
             dataframe,
             period=self.get_label_period_candles(pair),
             ratio=self.get_label_natr_ratio(pair),
         )
         dataframe[EXTREMA_COLUMN] = 0
-        for peak_idx, peak_dir in zip(peak_indices, peak_directions):
-            dataframe.at[peak_idx, EXTREMA_COLUMN] = peak_dir
+        for pivot_idx, pivot_dir in zip(pivot_indices, pivot_directions):
+            dataframe.at[pivot_idx, EXTREMA_COLUMN] = pivot_dir
         dataframe["minima"] = np.where(dataframe[EXTREMA_COLUMN] == -1, -1, 0)
         dataframe["maxima"] = np.where(dataframe[EXTREMA_COLUMN] == 1, 1, 0)
         dataframe[EXTREMA_COLUMN] = self.smooth_extrema(
index e6a248f5eacb7d7d3ceb85d5da5819d6b288a562..475907337517a1bc1005fb237d910035ebd32ce5 100644 (file)
@@ -299,187 +299,92 @@ def alligator(
 
 
 def zigzag(
-    df: pd.DataFrame, threshold: float = 0.05
-) -> tuple[list[int], list[float], list[int]]:
-    """
-    Calculate the ZigZag indicator for a OHLCV DataFrame.
-
-    Parameters:
-    df (pd.DataFrame): OHLCV DataFrame.
-    threshold (float): Percentage threshold for reversal (default 0.05 for 5%).
-
-    Returns:
-    tuple: Lists of indices, extrema, and directions.
-    """
-    if df.empty:
-        return [], [], []
-
-    indices = []
-    extrema = []
-    directions = []
-
-    first_high = df["high"].iloc[0]
-    first_low = df["low"].iloc[0]
-
-    if (first_high - first_low) / first_low >= threshold:
-        current_dir = 1
-        current_extreme = first_high
-    else:
-        current_dir = -1
-        current_extreme = first_low
-    current_extreme_idx = df.index[0]
-
-    indices.append(current_extreme_idx)
-    extrema.append(current_extreme)
-    directions.append(current_dir)
-    last_idx = current_extreme_idx
-
-    for i in range(1, len(df)):
-        current_idx = df.index[i]
-        h = df.at[current_idx, "high"]
-        l = df.at[current_idx, "low"]
-
-        if current_dir == 1:  # Looking for higher high
-            if h > current_extreme:
-                current_extreme = h
-                current_extreme_idx = current_idx
-                continue
-            if (current_extreme - l) / current_extreme >= threshold:
-                if current_extreme_idx != last_idx:
-                    indices.append(current_extreme_idx)
-                    extrema.append(current_extreme)
-                    directions.append(current_dir)
-                    last_idx = current_extreme_idx
-
-                current_dir = -1
-                current_extreme = l
-                current_extreme_idx = current_idx
-
-        elif current_dir == -1:  # Looking for lower low
-            if l < current_extreme:
-                current_extreme = l
-                current_extreme_idx = current_idx
-                continue
-            if (h - current_extreme) / current_extreme >= threshold:
-                if current_extreme_idx != last_idx:
-                    indices.append(current_extreme_idx)
-                    extrema.append(current_extreme)
-                    directions.append(current_dir)
-                    last_idx = current_extreme_idx
-
-                current_dir = 1
-                current_extreme = h
-                current_extreme_idx = current_idx
-
-    if current_extreme_idx != last_idx:
-        indices.append(current_extreme_idx)
-        extrema.append(current_extreme)
-        directions.append(current_dir)
-
-    return indices[1:], extrema[1:], directions[1:]
-
-
-def dynamic_zigzag(
     df: pd.DataFrame,
     period: int = 14,
-    natr: bool = True,
     ratio: float = 1.0,
+    depth: int = 7,
 ) -> tuple[list[int], list[float], list[int]]:
-    """
-    Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
-
-    Parameters:
-    df (pd.DataFrame): OHLCV DataFrame.
-    period (int): Period for ATR/NATR calculation (default: 14).
-    natr (bool): Use NATR (True) or ATR (False) (default: True).
-    ratio (float): ratio for dynamic threshold (default: 1.0).
-
-    Returns:
-    tuple: Lists of indices, extrema, and directions.
-    """
-    if df.empty:
+    if df.empty or len(df) < 2:
         return [], [], []
 
-    if natr:
-        thresholds = ta.NATR(df, timeperiod=period)
+    indices = df.index.tolist()
+    thresholds = (
+        (ta.NATR(df, timeperiod=period).shift(1) * ratio).fillna(method="bfill").values
+    )
+    highs = df["high"].values
+    lows = df["low"].values
+
+    pivots_indices, pivots_values, pivots_directions = [], [], []
+    state = 0  # 0=neutral, 1=up, -1=down
+    last_pivot_pos = -depth
+
+    def add_pivot(pos: int, value: float, direction: int):
+        nonlocal last_pivot_pos
+        pivots_indices.append(indices[pos])
+        pivots_values.append(value)
+        pivots_directions.append(direction)
+        last_pivot_pos = pos
+
+    def update_last_pivot(pos: int, value: float, direction: int):
+        if pivots_indices:
+            pivots_indices[-1] = indices[pos]
+            pivots_values[-1] = value
+            pivots_directions[-1] = direction
+
+    initial_high = highs[0]
+    initial_low = lows[0]
+    initial_high_pos = 0
+    initial_low_pos = 0
+    for i in range(1, len(df)):
+        if highs[i] > initial_high:
+            initial_high, initial_high_pos = highs[i], i
+        if lows[i] < initial_low:
+            initial_low, initial_low_pos = lows[i], i
+
+        if (highs[i] - initial_low) / initial_low > thresholds[i]:
+            add_pivot(initial_low_pos, initial_low, -1)
+            state = 1
+            break
+        elif (initial_high - lows[i]) / initial_high > thresholds[i]:
+            add_pivot(initial_high_pos, initial_high, 1)
+            state = -1
+            break
     else:
-        thresholds = ta.ATR(df, timeperiod=period)
-    thresholds = thresholds.ffill().bfill() * ratio
-
-    indices = []
-    extrema = []
-    directions = []
-
-    first_high = df["high"].iloc[0]
-    first_low = df["low"].iloc[0]
-    first_threshold = thresholds.iloc[0]
+        return [], [], []
 
-    if natr:
-        first_move = (first_high - first_low) / first_low
-    else:
-        first_move = first_high - first_low
-    if first_move >= first_threshold:
-        current_dir = 1
-        current_extreme = first_high
-    else:
-        current_dir = -1
-        current_extreme = first_low
-    current_extreme_idx = df.index[0]
+    for i in range(i + 1, len(df)):
+        if state == 1:
+            if highs[i] > pivots_values[-1]:
+                update_last_pivot(i, highs[i], 1)
+            elif (pivots_values[-1] - lows[i]) / pivots_values[-1] >= thresholds[
+                i
+            ] and (i - last_pivot_pos) >= depth:
+                add_pivot(i, lows[i], -1)
+                state = -1
+        elif state == -1:
+            if lows[i] < pivots_values[-1]:
+                update_last_pivot(i, lows[i], -1)
+            elif (highs[i] - pivots_values[-1]) / pivots_values[-1] >= thresholds[
+                i
+            ] and (i - last_pivot_pos) >= depth:
+                add_pivot(i, highs[i], 1)
+                state = 1
+
+    if state != 0 and (len(df) - 1 - last_pivot_pos) >= depth:
+        final_pos = len(df) - 1
+        last_pivot_val = pivots_values[-1]
+        price_move = (
+            (highs[final_pos] - last_pivot_val) / last_pivot_val
+            if state == 1
+            else (last_pivot_val - lows[final_pos]) / last_pivot_val
+        )
 
-    indices.append(current_extreme_idx)
-    extrema.append(current_extreme)
-    directions.append(current_dir)
-    last_idx = current_extreme_idx
+        if (
+            price_move >= thresholds[final_pos]
+            and indices[final_pos] != pivots_indices[-1]
+        ):
+            add_pivot(
+                final_pos, highs[final_pos] if state == 1 else lows[final_pos], state
+            )
 
-    for i in range(1, len(df)):
-        current_idx = df.index[i]
-        h = df.at[current_idx, "high"]
-        l = df.at[current_idx, "low"]
-        threshold = thresholds.iloc[i]
-
-        if current_dir == 1:  # Looking for higher high
-            if h > current_extreme:
-                current_extreme = h
-                current_extreme_idx = current_idx
-                continue
-            if natr:
-                reversal = (current_extreme - l) / current_extreme >= threshold
-            else:
-                reversal = (current_extreme - l) >= threshold
-            if reversal:
-                if current_extreme_idx != last_idx:
-                    indices.append(current_extreme_idx)
-                    extrema.append(current_extreme)
-                    directions.append(current_dir)
-                    last_idx = current_extreme_idx
-
-                current_dir = -1
-                current_extreme = l
-                current_extreme_idx = current_idx
-
-        elif current_dir == -1:  # Looking for lower low
-            if l < current_extreme:
-                current_extreme = l
-                current_extreme_idx = current_idx
-                continue
-            if natr:
-                reversal = (h - current_extreme) / current_extreme >= threshold
-            else:
-                reversal = (h - current_extreme) >= threshold
-            if reversal:
-                if current_extreme_idx != last_idx:
-                    indices.append(current_extreme_idx)
-                    extrema.append(current_extreme)
-                    directions.append(current_dir)
-                    last_idx = current_extreme_idx
-
-                current_dir = 1
-                current_extreme = h
-                current_extreme_idx = current_idx
-
-    if current_extreme_idx != last_idx:
-        indices.append(current_extreme_idx)
-        extrema.append(current_extreme)
-        directions.append(current_dir)
-
-    return indices[1:], extrema[1:], directions[1:]
+    return pivots_indices, pivots_values, pivots_directions