]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
feat(qav3): peaks identification optimisation zigzag compatible
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 9 Apr 2025 13:44:51 +0000 (15:44 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 9 Apr 2025 13:44:51 +0000 (15:44 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
quickadapter/user_data/config-template.json
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py
quickadapter/user_data/strategies/QuickAdapterV3.py
quickadapter/user_data/strategies/Utils.py

index 1b686757eb659c0db7290cad99bee9c7c3a01f31..975095dc4f126b285794fb599052b1c2e25b16b1 100644 (file)
       "DI_cutoff": 2,
       "&s-minima_threshold": -2,
       "&s-maxima_threshold": 2,
+      "label_period_candles": 100,
+      "label_natr_ratio": 0.075,
       "hp_rmse": -1,
       "train_rmse": -1
     },
index f51ac5df8055acd9d67e5852dcb0d4f88b087ea7..154f264bf34ed3284a8ae45a036910272d41e94b 100644 (file)
@@ -7,6 +7,7 @@ import scipy as sp
 import optuna
 import sklearn
 import warnings
+import talib.abstract as ta
 
 from functools import cached_property
 from typing import Any, Callable, Optional
@@ -43,7 +44,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     https://github.com/sponsors/robcaulk
     """
 
-    version = "3.6.9"
+    version = "3.7.0"
 
     @cached_property
     def _optuna_config(self) -> dict:
@@ -86,17 +87,26 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         )
         self._optuna_hp_rmse: dict[str, float] = {}
         self._optuna_train_rmse: dict[str, float] = {}
+        self._optuna_label_values: dict[str, dict] = {}
         self._optuna_hp_params: dict[str, dict] = {}
         self._optuna_train_params: dict[str, dict] = {}
+        self._optuna_label_params: dict[str, dict] = {}
         for pair in self.pairs:
             self._optuna_hp_rmse[pair] = -1
             self._optuna_train_rmse[pair] = -1
+            self._optuna_label_values[pair] = [-1, -1]
             self._optuna_hp_params[pair] = (
                 self.optuna_load_best_params(pair, "hp") or {}
             )
             self._optuna_train_params[pair] = (
                 self.optuna_load_best_params(pair, "train") or {}
             )
+            self._optuna_label_params[pair] = self.optuna_load_best_params(
+                pair, "label"
+            ) or {
+                "label_period_candles": self.ft_params.get("label_period_candles", 50),
+                "label_natr_ratio": self.ft_params.get("label_natr_ratio", 0.075),
+            }
         logger.info(
             f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}"
         )
@@ -106,6 +116,8 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             params = self._optuna_hp_params.get(pair)
         elif namespace == "train":
             params = self._optuna_train_params.get(pair)
+        elif namespace == "label":
+            params = self._optuna_label_params.get(pair)
         else:
             raise ValueError(f"Invalid namespace: {namespace}")
         return params
@@ -115,6 +127,8 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
             self._optuna_hp_params[pair] = params
         elif namespace == "train":
             self._optuna_train_params[pair] = params
+        elif namespace == "label":
+            self._optuna_label_params[pair] = params
         else:
             raise ValueError(f"Invalid namespace: {namespace}")
 
@@ -135,6 +149,19 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         else:
             raise ValueError(f"Invalid namespace: {namespace}")
 
+    def get_optuna_values(self, pair: str, namespace: str) -> list:
+        if namespace == "label":
+            values = self._optuna_label_values.get(pair)
+        else:
+            raise ValueError(f"Invalid namespace: {namespace}")
+        return values
+
+    def set_optuna_values(self, pair: str, namespace: str, values: list) -> None:
+        if namespace == "label":
+            self._optuna_label_values[pair] = values
+        else:
+            raise ValueError(f"Invalid namespace: {namespace}")
+
     def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
         """
         User sets up the training and test data to fit their desired model here
@@ -158,9 +185,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         start = time.time()
         if self._optuna_hyperopt:
             self.optuna_optimize(
-                dk.pair,
-                "hp",
-                lambda trial: hp_objective(
+                pair=dk.pair,
+                namespace="hp",
+                objective=lambda trial: hp_objective(
                     trial,
                     self.freqai_info.get("regressor", "xgboost"),
                     X,
@@ -171,6 +198,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                     test_weights,
                     model_training_parameters,
                 ),
+                direction=optuna.study.StudyDirection.MINIMIZE,
             )
 
             optuna_hp_params = self.get_optuna_params(dk.pair, "hp")
@@ -181,9 +209,9 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                 }
 
             self.optuna_optimize(
-                dk.pair,
-                "train",
-                lambda trial: train_objective(
+                pair=dk.pair,
+                namespace="train",
+                objective=lambda trial: train_objective(
                     trial,
                     self.freqai_info.get("regressor", "xgboost"),
                     X,
@@ -196,6 +224,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                     self._optuna_config.get("candles_step"),
                     model_training_parameters,
                 ),
+                direction=optuna.study.StudyDirection.MINIMIZE,
             )
 
             optuna_train_params = self.get_optuna_params(dk.pair, "train")
@@ -212,7 +241,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
 
         eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights)
 
-        model = train_regressor(
+        model = fit_regressor(
             regressor=self.freqai_info.get("regressor", "xgboost"),
             X=X,
             y=y,
@@ -230,12 +259,31 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
     def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
         warmed_up = True
 
-        num_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
+        fit_live_predictions_candles = self.freqai_info.get(
+            "fit_live_predictions_candles", 100
+        )
+
+        df = self.data_provider.get_pair_dataframe(pair)
+        self.optuna_optimize(
+            pair=pair,
+            namespace="label",
+            objective=lambda trial: label_objective(
+                trial,
+                df,
+                fit_live_predictions_candles,
+                self._optuna_config.get("candles_step"),
+            ),
+            directions=[
+                optuna.study.StudyDirection.MAXIMIZE,
+                optuna.study.StudyDirection.MAXIMIZE,
+            ],
+        )
+
         if self.live:
             if not hasattr(self, "exchange_candles"):
                 self.exchange_candles = len(self.dd.model_return_values[pair].index)
             candles_diff = len(self.dd.historic_predictions[pair].index) - (
-                num_candles + self.exchange_candles
+                fit_live_predictions_candles + self.exchange_candles
             )
             if candles_diff < 0:
                 logger.warning(
@@ -245,7 +293,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
 
         pred_df_full = (
             self.dd.historic_predictions[pair]
-            .iloc[-num_candles:]
+            .iloc[-fit_live_predictions_candles:]
             .reset_index(drop=True)
         )
 
@@ -286,6 +334,13 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2]
         dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff
 
+        dk.data["extra_returns_per_train"]["label_period_candles"] = (
+            self.get_optuna_params(pair, "label").get("label_period_candles")
+        )
+        dk.data["extra_returns_per_train"]["label_natr_ratio"] = self.get_optuna_params(
+            pair, "label"
+        ).get("label_natr_ratio")
+
         dk.data["extra_returns_per_train"]["hp_rmse"] = self.get_optuna_rmse(pair, "hp")
         dk.data["extra_returns_per_train"]["train_rmse"] = self.get_optuna_rmse(
             pair, "train"
@@ -317,21 +372,77 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         )
         return min_pred, max_pred
 
+    @staticmethod
+    def get_multi_objective_study_best_trial(
+        namespace: str, study: optuna.study.Study
+    ) -> Optional[optuna.trial.FrozenTrial]:
+        if not QuickAdapterRegressorV3.optuna_study_has_best_trials(study):
+            return None
+        best_trials = study.best_trials
+        if namespace == "label":
+            range_sizes = [trial.values[1] for trial in best_trials]
+            mean_range_size = np.mean(range_sizes)
+            equal_mean_trials = [
+                trial
+                for trial in best_trials
+                if np.isclose(trial.values[1], mean_range_size)
+            ]
+            if equal_mean_trials:
+                return max(equal_mean_trials, key=lambda trial: trial.values[0])
+            nearest_above_mean = (np.inf, -np.inf, None)
+            nearest_below_mean = (-np.inf, -np.inf, None)
+            for idx, trial in enumerate(best_trials):
+                range_size = trial.values[1]
+                if range_size >= mean_range_size:
+                    if range_size < nearest_above_mean[0] or (
+                        range_size == nearest_above_mean[0]
+                        and trial.values[0] > nearest_above_mean[1]
+                    ):
+                        nearest_above_mean = (range_size, trial.values[0], idx)
+                if range_size <= mean_range_size:
+                    if range_size > nearest_below_mean[0] or (
+                        range_size == nearest_below_mean[0]
+                        and trial.values[0] > nearest_below_mean[1]
+                    ):
+                        nearest_below_mean = (range_size, trial.values[0], idx)
+            if nearest_above_mean[2] is None or nearest_below_mean[2] is None:
+                return None
+            above_mean_trial = best_trials[nearest_above_mean[2]]
+            below_mean_trial = best_trials[nearest_below_mean[2]]
+            if above_mean_trial.values[0] >= below_mean_trial.values[0]:
+                return above_mean_trial
+            else:
+                return below_mean_trial
+        else:
+            raise ValueError(f"Invalid namespace: {namespace}")
+
     def optuna_optimize(
         self,
         pair: str,
         namespace: str,
-        objective: Callable[[optuna.Trial], float],
+        objective: Callable[[optuna.trial.Trial], float],
+        direction: Optional[optuna.study.StudyDirection] = None,
+        directions: Optional[list[optuna.study.StudyDirection]] = None,
     ) -> None:
         identifier = self.freqai_info.get("identifier")
-        study = self.optuna_create_study(pair, f"{identifier}-{namespace}-{pair}")
+        study = self.optuna_create_study(
+            pair=pair,
+            study_name=f"{identifier}-{pair}-{namespace}",
+            direction=direction,
+            directions=directions,
+        )
         if not study:
             return
 
         if self._optuna_config.get("warm_start"):
             self.optuna_enqueue_previous_best_params(pair, namespace, study)
 
-        logger.info(f"Optuna {namespace} hyperopt started")
+        is_study_multi_objective = direction is None and directions is not None
+        if is_study_multi_objective is True:
+            objective_type = "multi objective"
+        else:
+            objective_type = "single objective"
+        logger.info(f"Optuna {pair} {namespace} {objective_type} hyperopt started")
         start_time = time.time()
         try:
             study.optimize(
@@ -344,26 +455,47 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         except Exception as e:
             time_spent = time.time() - start_time
             logger.error(
-                f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
+                f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
                 exc_info=True,
             )
             return
 
         time_spent = time.time() - start_time
-        if QuickAdapterRegressorV3.optuna_study_has_best_params(study):
-            logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)")
-            self.set_optuna_params(pair, namespace, study.best_params)
+        if is_study_multi_objective is False:
+            if not QuickAdapterRegressorV3.optuna_study_has_best_params(study):
+                logger.error(
+                    f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): no study best params found"
+                )
+                return
             self.set_optuna_rmse(pair, namespace, study.best_value)
-            for key, value in {
+            self.set_optuna_params(pair, namespace, study.best_params)
+            study_results = {
                 "rmse": self.get_optuna_rmse(pair, namespace),
                 **self.get_optuna_params(pair, namespace),
-            }.items():
-                logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}")
-            self.optuna_save_best_params(pair, namespace)
+            }
         else:
-            logger.error(
-                f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
+            best_trial = QuickAdapterRegressorV3.get_multi_objective_study_best_trial(
+                "label", study
+            )
+            if not best_trial:
+                logger.error(
+                    f"Optuna {pair} {namespace} {objective_type} hyperopt failed ({time_spent:.2f} secs): no study best trial found"
+                )
+                return
+            self.set_optuna_values(pair, namespace, best_trial.values)
+            self.set_optuna_params(pair, namespace, best_trial.params)
+            study_results = {
+                "values": self.get_optuna_values(pair, namespace),
+                **self.get_optuna_params(pair, namespace),
+            }
+        logger.info(
+            f"Optuna {pair} {namespace} {objective_type} done ({time_spent:.2f} secs)"
+        )
+        for key, value in study_results.items():
+            logger.info(
+                f"Optuna {pair} {namespace} {objective_type} hyperopt | {key:>20s} : {value}"
             )
+        self.optuna_save_best_params(pair, namespace)
 
     def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage:
         storage_dir = self.full_path
@@ -384,7 +516,11 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         return storage
 
     def optuna_create_study(
-        self, pair: str, study_name: str
+        self,
+        pair: str,
+        study_name: str,
+        direction: Optional[optuna.study.StudyDirection] = None,
+        directions: Optional[list[optuna.study.StudyDirection]] = None,
     ) -> Optional[optuna.study.Study]:
         try:
             storage = self.optuna_storage(pair)
@@ -405,7 +541,8 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
                     multivariate=True, group=True, seed=1
                 ),
                 pruner=optuna.pruners.HyperbandPruner(),
-                direction=optuna.study.StudyDirection.MINIMIZE,
+                direction=direction,
+                directions=directions,
                 storage=storage,
                 load_if_exists=not self._optuna_config.get("continuous"),
             )
@@ -482,8 +619,22 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         except ValueError:
             return False
 
+    @staticmethod
+    def optuna_study_has_best_trials(study: Optional[optuna.study.Study]) -> bool:
+        if study is None:
+            return False
+        try:
+            _ = study.best_trials
+            return True
+        # file backend storage raises KeyError
+        except KeyError:
+            return False
+        # sqlite backend storage raises ValueError
+        except ValueError:
+            return False
+
 
-def get_callbacks(trial: optuna.Trial, regressor: str) -> list[Callable]:
+def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
     if regressor == "xgboost":
         callbacks = [
             optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
@@ -495,7 +646,7 @@ def get_callbacks(trial: optuna.Trial, regressor: str) -> list[Callable]:
     return callbacks
 
 
-def train_regressor(
+def fit_regressor(
     regressor: str,
     X: pd.DataFrame,
     y: pd.DataFrame,
@@ -542,21 +693,8 @@ def train_regressor(
     return model
 
 
-def round_to_nearest(value: float, step: int) -> int:
-    """
-    Round a value to the nearest multiple of a given step.
-    :param value: The value to round.
-    :param step: The step size to round to (must be non-zero).
-    :return: The rounded value.
-    :raises ValueError: If step is zero.
-    """
-    if step == 0:
-        raise ValueError("step must be non-zero")
-    return int(round(value / step) * step)
-
-
 def train_objective(
-    trial: optuna.Trial,
+    trial: optuna.trial.Trial,
     regressor: str,
     X: pd.DataFrame,
     y: pd.DataFrame,
@@ -590,7 +728,7 @@ def train_objective(
     y_test = y_test.iloc[-test_window:]
     test_weights = test_weights[-test_window:]
 
-    model = train_regressor(
+    model = fit_regressor(
         regressor=regressor,
         X=X,
         y=y,
@@ -609,7 +747,9 @@ def train_objective(
     return error
 
 
-def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> dict:
+def get_optuna_study_model_parameters(
+    trial: optuna.trial.Trial, regressor: str
+) -> dict:
     study_model_parameters = {
         "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
         "min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
@@ -639,7 +779,7 @@ def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> di
 
 
 def hp_objective(
-    trial: optuna.Trial,
+    trial: optuna.trial.Trial,
     regressor: str,
     X: pd.DataFrame,
     y: pd.DataFrame,
@@ -652,7 +792,7 @@ def hp_objective(
     study_model_parameters = get_optuna_study_model_parameters(trial, regressor)
     model_training_parameters = {**model_training_parameters, **study_model_parameters}
 
-    model = train_regressor(
+    model = fit_regressor(
         regressor=regressor,
         X=X,
         y=y,
@@ -671,13 +811,158 @@ def hp_objective(
     return error
 
 
+def dynamic_zigzag(
+    df: pd.DataFrame,
+    period: int = 14,
+    natr: bool = True,
+    ratio: float = 1.0,
+) -> tuple[list[int], list[float], list[int]]:
+    """
+    Calculate the ZigZag indicator for a OHLCV DataFrame with dynamic threshold using ATR/NATR.
+
+    Parameters:
+    df (pd.DataFrame): OHLCV DataFrame.
+    period (int): Period for ATR/NATR calculation (default: 14).
+    natr (bool): Use NATR (True) or ATR (False) (default: True).
+    ratio (float): ratio for dynamic threshold (default: 1.0).
+
+    Returns:
+    tuple: Lists of indices, extrema, and directions.
+    """
+    if df.empty:
+        return [], [], []
+
+    if natr:
+        thresholds = ta.NATR(df, timeperiod=period)
+    else:
+        thresholds = ta.ATR(df, timeperiod=period)
+    thresholds = thresholds.ffill().bfill() * ratio
+
+    indices = []
+    extrema = []
+    directions = []
+
+    first_high = df["high"].iloc[0]
+    first_low = df["low"].iloc[0]
+    first_threshold = thresholds.iloc[0]
+
+    if natr:
+        first_move = (first_high - first_low) / first_low
+    else:
+        first_move = first_high - first_low
+    if first_move >= first_threshold:
+        current_dir = 1
+        current_extreme = first_high
+    else:
+        current_dir = -1
+        current_extreme = first_low
+    current_extreme_idx = df.index[0]
+
+    indices.append(current_extreme_idx)
+    extrema.append(current_extreme)
+    directions.append(current_dir)
+    last_idx = current_extreme_idx
+
+    for i in range(1, len(df)):
+        current_idx = df.index[i]
+        h = df.at[current_idx, "high"]
+        l = df.at[current_idx, "low"]
+        threshold = thresholds.iloc[i]
+
+        if current_dir == 1:  # Looking for higher high
+            if h > current_extreme:
+                current_extreme = h
+                current_extreme_idx = current_idx
+                continue
+            if natr:
+                reversal = (current_extreme - l) / current_extreme >= threshold
+            else:
+                reversal = (current_extreme - l) >= threshold
+            if reversal:
+                if current_extreme_idx != last_idx:
+                    indices.append(current_extreme_idx)
+                    extrema.append(current_extreme)
+                    directions.append(current_dir)
+                    last_idx = current_extreme_idx
+
+                current_dir = -1
+                current_extreme = l
+                current_extreme_idx = current_idx
+
+        elif current_dir == -1:  # Looking for lower low
+            if l < current_extreme:
+                current_extreme = l
+                current_extreme_idx = current_idx
+                continue
+            if natr:
+                reversal = (h - current_extreme) / current_extreme >= threshold
+            else:
+                reversal = (h - current_extreme) >= threshold
+            if reversal:
+                if current_extreme_idx != last_idx:
+                    indices.append(current_extreme_idx)
+                    extrema.append(current_extreme)
+                    directions.append(current_dir)
+                    last_idx = current_extreme_idx
+
+                current_dir = 1
+                current_extreme = h
+                current_extreme_idx = current_idx
+
+    if current_extreme_idx != last_idx:
+        indices.append(current_extreme_idx)
+        extrema.append(current_extreme)
+        directions.append(current_dir)
+
+    return indices[1:], extrema[1:], directions[1:]
+
+
+def label_objective(
+    trial: optuna.trial.Trial,
+    df: pd.DataFrame,
+    fit_live_predictions_candles: int,
+    candles_step: int,
+) -> tuple[float, float]:
+    min_label_period_candles: int = round_to_nearest(
+        max(fit_live_predictions_candles // 16, 20), candles_step
+    )
+    max_label_period_candles: int = round_to_nearest(
+        max(fit_live_predictions_candles // 4, min_label_period_candles),
+        candles_step,
+    )
+    label_period_candles = trial.suggest_int(
+        "label_period_candles",
+        min_label_period_candles,
+        max_label_period_candles,
+        step=candles_step,
+    )
+    label_natr_ratio = trial.suggest_float("label_natr_ratio", 0.05, 0.1)
+
+    _, peak_values, _ = dynamic_zigzag(
+        df,
+        period=label_period_candles,
+        ratio=label_natr_ratio,
+    )
+
+    if len(peak_values) < 2:
+        return -float("inf"), -float("inf")
+
+    previous_value = peak_values[0]
+    peak_ranges = []
+    for peak_value in peak_values[1:]:
+        peak_ranges.append(abs(peak_value - previous_value))
+        previous_value = peak_value
+
+    return np.mean(peak_ranges), len(peak_ranges)
+
+
 def smoothed_max(series: pd.Series, temperature=1.0) -> float:
     data_array = series.to_numpy()
     if data_array.size == 0:
         return np.nan
     if temperature < 0:
         raise ValueError("temperature must be non-negative.")
-    if temperature == 0:
+    if np.close(temperature, 0):
         return data_array.max()
     return sp.special.logsumexp(temperature * data_array) / temperature
 
@@ -688,6 +973,19 @@ def smoothed_min(series: pd.Series, temperature=1.0) -> float:
         return np.nan
     if temperature < 0:
         raise ValueError("temperature must be non-negative.")
-    if temperature == 0:
+    if np.close(temperature, 0):
         return data_array.min()
     return -sp.special.logsumexp(-temperature * data_array) / temperature
+
+
+def round_to_nearest(value: float, step: int) -> int:
+    """
+    Round a value to the nearest multiple of a given step.
+    :param value: The value to round.
+    :param step: The step size to round to (must be non-zero).
+    :return: The rounded value.
+    :raises ValueError: If step is zero.
+    """
+    if step == 0:
+        raise ValueError("step must be non-zero")
+    return int(round(value / step) * step)
index 0547d93491534a43cc32d8cd11cea6f520bcce7b..98fff8c9d8c448e43b22d729aa49c7062be854f4 100644 (file)
@@ -1,9 +1,9 @@
+import json
 import logging
 from functools import reduce, cached_property
 import datetime
 import math
 from pathlib import Path
-from statistics import geometric_mean
 import talib.abstract as ta
 from pandas import DataFrame, Series, isna
 from typing import Optional
@@ -58,27 +58,19 @@ class QuickAdapterV3(IStrategy):
     INTERFACE_VERSION = 3
 
     def version(self) -> str:
-        return "3.2.16"
+        return "3.3.0"
 
     timeframe = "5m"
 
     stoploss = -0.02
     use_custom_stoploss = True
 
-    @cached_property
-    def trailing_stoploss_natr_ratio(self) -> float:
-        return self.config.get("trailing_stoploss_natr_ratio", 0.025)
-
     # Trailing stop:
     trailing_stop = False
     trailing_stop_positive = 0.01
     trailing_stop_positive_offset = 0.011
     trailing_only_offset_is_reached = True
 
-    @cached_property
-    def label_natr_ratio(self) -> float:
-        return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075)
-
     @cached_property
     def entry_natr_ratio(self) -> float:
         return self.config.get("entry_pricing", {}).get("entry_natr_ratio", 0.00025)
@@ -181,6 +173,16 @@ class QuickAdapterV3(IStrategy):
             / "models"
             / f"{self.freqai_info.get('identifier', 'no_id_provided')}"
         )
+        self._label_params: dict[str, dict] = {}
+        for pair in self.pairs:
+            self._label_params[pair] = self.optuna_load_best_params(pair, "label") or {
+                "label_period_candles": self.freqai_info["feature_parameters"].get(
+                    "label_period_candles", 50
+                ),
+                "label_natr_ratio": self.freqai_info["feature_parameters"].get(
+                    "label_natr_ratio", 0.075
+                ),
+            }
 
     def feature_engineering_expand_all(
         self, dataframe: DataFrame, period: int, metadata: dict, **kwargs
@@ -352,12 +354,34 @@ class QuickAdapterV3(IStrategy):
         return dataframe
 
     def get_label_period_candles(self, pair: str) -> int:
+        label_period_candles = self._label_params.get(pair).get("label_period_candles")
+        if label_period_candles:
+            return label_period_candles
         return self.freqai_info["feature_parameters"].get("label_period_candles", 50)
 
+    def set_label_period_candles(self, pair: str, label_period_candles: int):
+        if label_period_candles:
+            self._label_params[pair]["label_period_candles"] = label_period_candles
+
+    def get_label_natr_ratio(self, pair: str) -> float:
+        label_natr_ratio = self._label_params.get(pair).get("label_natr_ratio")
+        if label_natr_ratio:
+            return label_natr_ratio
+        return self.freqai_info["feature_parameters"].get("label_natr_ratio", 0.075)
+
+    def set_label_natr_ratio(self, pair: str, label_natr_ratio: float):
+        if label_natr_ratio:
+            self._label_params[pair]["label_natr_ratio"] = label_natr_ratio
+
+    def get_trailing_stoploss_natr_ratio(self, pair: str) -> float:
+        return self.get_label_natr_ratio(pair) * 0.025
+
     def set_freqai_targets(self, dataframe: DataFrame, metadata: dict, **kwargs):
-        label_period_candles = self.get_label_period_candles(str(metadata.get("pair")))
+        pair = str(metadata.get("pair"))
         peak_indices, _, peak_directions = dynamic_zigzag(
-            dataframe, period=label_period_candles, ratio=self.label_natr_ratio
+            dataframe,
+            period=self.get_label_period_candles(pair),
+            ratio=self.get_label_natr_ratio(pair),
         )
         dataframe[EXTREMA_COLUMN] = 0
         for peak_idx, peak_dir in zip(peak_indices, peak_directions):
@@ -381,9 +405,11 @@ class QuickAdapterV3(IStrategy):
 
         pair = str(metadata.get("pair"))
 
-        label_period_candles = self.get_label_period_candles(pair)
+        self.set_label_period_candles(pair, dataframe["label_period_candles"].iloc[-1])
+        self.set_label_natr_ratio(pair, dataframe["label_natr_ratio"].iloc[-1])
+
         dataframe["natr_label_period_candles"] = ta.NATR(
-            dataframe, timeperiod=label_period_candles
+            dataframe, timeperiod=self.get_label_period_candles(pair)
         )
 
         dataframe["minima_threshold"] = dataframe[MINIMA_THRESHOLD_COLUMN]
@@ -471,7 +497,7 @@ class QuickAdapterV3(IStrategy):
         return (
             current_rate
             * current_natr
-            * self.trailing_stoploss_natr_ratio
+            * self.get_trailing_stoploss_natr_ratio(trade.pair)
             * (1 / math.log10(1 + 0.25 * trade_duration_candles))
         )
 
@@ -488,14 +514,18 @@ class QuickAdapterV3(IStrategy):
         if isna(current_natr):
             return None
         trade_take_profit_distance = (
-            trade.open_rate * entry_natr * self.trailing_stoploss_natr_ratio
+            trade.open_rate
+            * entry_natr
+            * self.get_trailing_stoploss_natr_ratio(trade.pair)
         )
         return max(
             trade_take_profit_distance,
-            geometric_mean(
+            np.mean(
                 [
                     trade_take_profit_distance,
-                    current_rate * current_natr * self.trailing_stoploss_natr_ratio,
+                    current_rate
+                    * current_natr
+                    * self.get_trailing_stoploss_natr_ratio(trade.pair),
                 ]
             )
             * math.log10(9 + trade_duration_candles)
@@ -519,7 +549,7 @@ class QuickAdapterV3(IStrategy):
         stoploss_distance = self.get_stoploss_distance(df, trade, current_rate)
         if isna(stoploss_distance):
             return None
-        if stoploss_distance == 0:
+        if np.close(stoploss_distance, 0):
             return None
         sign = 1 if trade.is_short else -1
         return stoploss_from_absolute(
@@ -567,7 +597,7 @@ class QuickAdapterV3(IStrategy):
         take_profit_distance = self.get_take_profit_distance(df, trade, current_rate)
         if isna(take_profit_distance):
             return None
-        if take_profit_distance == 0:
+        if np.close(take_profit_distance, 0):
             return None
         if trade.is_short:
             take_profit_price = trade.open_rate - take_profit_distance
@@ -682,3 +712,13 @@ class QuickAdapterV3(IStrategy):
             extrema_smoothing,
             smoothing_methods["gaussian"],
         )
+
+    def optuna_load_best_params(self, pair: str, namespace: str) -> Optional[dict]:
+        best_params_path = Path(
+            self.models_full_path
+            / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
+        )
+        if best_params_path.is_file():
+            with best_params_path.open("r", encoding="utf-8") as read_file:
+                return json.load(read_file)
+        return None
index 82c16fdfca764f0f889b05accf45f49c9491ce1a..160b57faedad7110f9a0ac741ad1e2a85e216d16 100644 (file)
@@ -318,10 +318,6 @@ def zigzag(
     extrema = []
     directions = []
 
-    current_dir = 0
-    current_extreme = None
-    current_extreme_idx = None
-
     first_high = df["high"].iloc[0]
     first_low = df["low"].iloc[0]
 
@@ -406,19 +402,15 @@ def dynamic_zigzag(
         thresholds = ta.NATR(df, timeperiod=period)
     else:
         thresholds = ta.ATR(df, timeperiod=period)
-    thresholds = thresholds.ffill().bfill()
+    thresholds = thresholds.ffill().bfill() * ratio
 
     indices = []
     extrema = []
     directions = []
 
-    current_dir = 0
-    current_extreme = None
-    current_extreme_idx = None
-
     first_high = df["high"].iloc[0]
     first_low = df["low"].iloc[0]
-    first_threshold = thresholds.iloc[0] * ratio
+    first_threshold = thresholds.iloc[0]
 
     if natr:
         first_move = (first_high - first_low) / first_low
@@ -441,47 +433,47 @@ def dynamic_zigzag(
         current_idx = df.index[i]
         h = df.at[current_idx, "high"]
         l = df.at[current_idx, "low"]
-        threshold = thresholds.iloc[i] * ratio
+        threshold = thresholds.iloc[i]
 
         if current_dir == 1:  # Looking for higher high
             if h > current_extreme:
                 current_extreme = h
                 current_extreme_idx = current_idx
+                continue
+            if natr:
+                reversal = (current_extreme - l) / current_extreme >= threshold
             else:
-                if natr:
-                    reversal = (current_extreme - l) / current_extreme >= threshold
-                else:
-                    reversal = (current_extreme - l) >= threshold
-                if reversal:
-                    if current_extreme_idx != last_idx:
-                        indices.append(current_extreme_idx)
-                        extrema.append(current_extreme)
-                        directions.append(current_dir)
-                        last_idx = current_extreme_idx
-
-                    current_dir = -1
-                    current_extreme = l
-                    current_extreme_idx = current_idx
+                reversal = (current_extreme - l) >= threshold
+            if reversal:
+                if current_extreme_idx != last_idx:
+                    indices.append(current_extreme_idx)
+                    extrema.append(current_extreme)
+                    directions.append(current_dir)
+                    last_idx = current_extreme_idx
+
+                current_dir = -1
+                current_extreme = l
+                current_extreme_idx = current_idx
 
         elif current_dir == -1:  # Looking for lower low
             if l < current_extreme:
                 current_extreme = l
                 current_extreme_idx = current_idx
+                continue
+            if natr:
+                reversal = (h - current_extreme) / current_extreme >= threshold
             else:
-                if natr:
-                    reversal = (h - current_extreme) / current_extreme >= threshold
-                else:
-                    reversal = (h - current_extreme) >= threshold
-                if reversal:
-                    if current_extreme_idx != last_idx:
-                        indices.append(current_extreme_idx)
-                        extrema.append(current_extreme)
-                        directions.append(current_dir)
-                        last_idx = current_extreme_idx
-
-                    current_dir = 1
-                    current_extreme = h
-                    current_extreme_idx = current_idx
+                reversal = (h - current_extreme) >= threshold
+            if reversal:
+                if current_extreme_idx != last_idx:
+                    indices.append(current_extreme_idx)
+                    extrema.append(current_extreme)
+                    directions.append(current_dir)
+                    last_idx = current_extreme_idx
+
+                current_dir = 1
+                current_extreme = h
+                current_extreme_idx = current_idx
 
     if current_extreme_idx != last_idx:
         indices.append(current_extreme_idx)