]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
Merge branch 'main' of github.com:jerome-benoit/freqai-strategies into perf/hpo_n_est...
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 19 Jul 2025 19:06:12 +0000 (21:06 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 19 Jul 2025 19:06:12 +0000 (21:06 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
1  2 
quickadapter/user_data/config-template.json
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py

index 0000000000000000000000000000000000000000,c48c47201ca224929f926d9a7aa5ea76b71f059b..c9d8fdfdca814e76dc778e4e41e3fd4ca36c3bb9
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,1743 +1,1751 @@@
 -            if param in [
 -                "learning_rate",
 -                "min_child_weight",
 -                "reg_alpha",
 -                "reg_lambda",
 -                "gamma",
 -                "min_split_gain",
 -            ]:
+ import copy
+ from enum import IntEnum
+ import logging
+ import json
+ import math
+ import random
+ from statistics import median
+ import time
+ import numpy as np
+ import pandas as pd
+ import scipy as sp
+ import optuna
+ import sklearn
+ import warnings
+ import talib.abstract as ta
+ from functools import cached_property, lru_cache
+ from typing import Any, Callable, Optional
+ from pathlib import Path
+ from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
+ from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
+ debug = False
+ TEST_SIZE = 0.1
+ EXTREMA_COLUMN = "&s-extrema"
+ MAXIMA_THRESHOLD_COLUMN = "&s-maxima_threshold"
+ MINIMA_THRESHOLD_COLUMN = "&s-minima_threshold"
+ warnings.simplefilter(action="ignore", category=FutureWarning)
+ logger = logging.getLogger(__name__)
+ class QuickAdapterRegressorV3(BaseRegressionModel):
+     """
+     The following freqaimodel is released to sponsors of the non-profit FreqAI open-source project.
+     If you find the FreqAI project useful, please consider supporting it by becoming a sponsor.
+     We use sponsor money to help stimulate new features and to pay for running these public
+     experiments, with a an objective of helping the community make smarter choices in their
+     ML journey.
+     This freqaimodel is experimental (as with all models released to sponsors). Do *not* expect
+     returns. The goal is to demonstrate gratitude to people who support the project and to
+     help them find a good starting point for their own creativity.
+     If you have questions, please direct them to our discord: https://discord.gg/xE4RMg4QYw
+     https://github.com/sponsors/robcaulk
+     """
+     version = "3.7.94"
+     @cached_property
+     def _optuna_config(self) -> dict[str, Any]:
+         optuna_default_config = {
+             "enabled": False,
+             "n_jobs": min(
+                 self.freqai_info.get("optuna_hyperopt", {}).get("n_jobs", 1),
+                 max(int(self.max_system_threads / 4), 1),
+             ),
+             "storage": "file",
+             "continuous": True,
+             "warm_start": True,
+             "n_startup_trials": 15,
+             "n_trials": 36,
+             "timeout": 7200,
+             "label_candles_step": 4,
+             "train_candles_step": 10,
+             "expansion_factor": 0.4,
+             "seed": 1,
+         }
+         return {
+             **optuna_default_config,
+             **self.freqai_info.get("optuna_hyperopt", {}),
+         }
+     @property
+     def _optuna_label_candle_pool_full(self) -> list[int]:
+         if not hasattr(self, "pairs") or not self.pairs:
+             raise RuntimeError(
+                 "Failed to initialize optuna label candle pool full: pairs property is not defined or empty"
+             )
+         n_pairs = len(self.pairs)
+         label_frequency_candles = max(
+             2, 2 * n_pairs, int(self.ft_params.get("label_frequency_candles", 12))
+         )
+         cache_key = label_frequency_candles
+         if cache_key not in self._optuna_label_candle_pool_full_cache:
+             half_label_frequency_candles = int(label_frequency_candles / 2)
+             min_offset = -half_label_frequency_candles
+             max_offset = half_label_frequency_candles
+             self._optuna_label_candle_pool_full_cache[cache_key] = [
+                 max(1, label_frequency_candles + offset)
+                 for offset in range(min_offset, max_offset + 1)
+             ]
+         return copy.deepcopy(self._optuna_label_candle_pool_full_cache[cache_key])
+     def __init__(self, **kwargs):
+         super().__init__(**kwargs)
+         self.pairs = self.config.get("exchange", {}).get("pair_whitelist")
+         if not self.pairs:
+             raise ValueError(
+                 "FreqAI model requires StaticPairList method defined in pairlists configuration and 'pair_whitelist' defined in exchange section configuration"
+             )
+         if (
+             self.freqai_info.get("identifier") is None
+             or self.freqai_info.get("identifier").strip() == ""
+         ):
+             raise ValueError(
+                 "FreqAI model requires 'identifier' defined in the freqai section configuration"
+             )
+         self._optuna_hyperopt: Optional[bool] = (
+             self.freqai_info.get("enabled", False)
+             and self._optuna_config.get("enabled")
+             and self.data_split_parameters.get("test_size", TEST_SIZE) > 0
+         )
+         self._optuna_hp_value: dict[str, float] = {}
+         self._optuna_train_value: dict[str, float] = {}
+         self._optuna_label_values: dict[str, list] = {}
+         self._optuna_hp_params: dict[str, dict[str, Any]] = {}
+         self._optuna_train_params: dict[str, dict[str, Any]] = {}
+         self._optuna_label_params: dict[str, dict[str, Any]] = {}
+         self._optuna_label_candle_pool_full_cache: dict[int, list[int]] = {}
+         self.init_optuna_label_candle_pool()
+         self._optuna_label_candle: dict[str, int] = {}
+         self._optuna_label_candles: dict[str, int] = {}
+         self._optuna_label_incremented_pairs: list[str] = []
+         for pair in self.pairs:
+             self._optuna_hp_value[pair] = -1
+             self._optuna_train_value[pair] = -1
+             self._optuna_label_values[pair] = [-1, -1]
+             self._optuna_hp_params[pair] = (
+                 self.optuna_load_best_params(pair, "hp")
+                 if self.optuna_load_best_params(pair, "hp")
+                 else {}
+             )
+             self._optuna_train_params[pair] = (
+                 self.optuna_load_best_params(pair, "train")
+                 if self.optuna_load_best_params(pair, "train")
+                 else {}
+             )
+             self._optuna_label_params[pair] = (
+                 self.optuna_load_best_params(pair, "label")
+                 if self.optuna_load_best_params(pair, "label")
+                 else {
+                     "label_period_candles": self.ft_params.get(
+                         "label_period_candles", 50
+                     ),
+                     "label_natr_ratio": float(
+                         self.ft_params.get("label_natr_ratio", 6.0)
+                     ),
+                 }
+             )
+             self.set_optuna_label_candle(pair)
+             self._optuna_label_candles[pair] = 0
+         logger.info(
+             f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}"
+         )
+     def get_optuna_params(self, pair: str, namespace: str) -> dict[str, Any]:
+         if namespace == "hp":
+             params = self._optuna_hp_params.get(pair)
+         elif namespace == "train":
+             params = self._optuna_train_params.get(pair)
+         elif namespace == "label":
+             params = self._optuna_label_params.get(pair)
+         else:
+             raise ValueError(f"Invalid namespace: {namespace}")
+         return params
+     def set_optuna_params(
+         self, pair: str, namespace: str, params: dict[str, Any]
+     ) -> None:
+         if namespace == "hp":
+             self._optuna_hp_params[pair] = params
+         elif namespace == "train":
+             self._optuna_train_params[pair] = params
+         elif namespace == "label":
+             self._optuna_label_params[pair] = params
+         else:
+             raise ValueError(f"Invalid namespace: {namespace}")
+     def get_optuna_value(self, pair: str, namespace: str) -> float:
+         if namespace == "hp":
+             value = self._optuna_hp_value.get(pair)
+         elif namespace == "train":
+             value = self._optuna_train_value.get(pair)
+         else:
+             raise ValueError(f"Invalid namespace: {namespace}")
+         return value
+     def set_optuna_value(self, pair: str, namespace: str, value: float) -> None:
+         if namespace == "hp":
+             self._optuna_hp_value[pair] = value
+         elif namespace == "train":
+             self._optuna_train_value[pair] = value
+         else:
+             raise ValueError(f"Invalid namespace: {namespace}")
+     def get_optuna_values(self, pair: str, namespace: str) -> list:
+         if namespace == "label":
+             values = self._optuna_label_values.get(pair)
+         else:
+             raise ValueError(f"Invalid namespace: {namespace}")
+         return values
+     def set_optuna_values(self, pair: str, namespace: str, values: list) -> None:
+         if namespace == "label":
+             self._optuna_label_values[pair] = values
+         else:
+             raise ValueError(f"Invalid namespace: {namespace}")
+     def init_optuna_label_candle_pool(self) -> None:
+         optuna_label_candle_pool_full = self._optuna_label_candle_pool_full
+         if len(optuna_label_candle_pool_full) == 0:
+             raise RuntimeError("Failed to initialize optuna label candle pool full")
+         self._optuna_label_candle_pool = optuna_label_candle_pool_full
+         random.shuffle(self._optuna_label_candle_pool)
+         if len(self._optuna_label_candle_pool) == 0:
+             raise RuntimeError("Failed to initialize optuna label candle pool")
+     def set_optuna_label_candle(self, pair: str) -> None:
+         if len(self._optuna_label_candle_pool) == 0:
+             logger.warning(
+                 "Optuna label candle pool is empty, reinitializing it ("
+                 f"{self._optuna_label_candle_pool=} ,"
+                 f"{self._optuna_label_candle_pool_full=} ,"
+                 f"{self._optuna_label_candle.values()=} ,"
+                 f"{self._optuna_label_candles.values()=} ,"
+                 f"{self._optuna_label_incremented_pairs=})"
+             )
+             self.init_optuna_label_candle_pool()
+         optuna_label_candle_pool = copy.deepcopy(self._optuna_label_candle_pool)
+         for p in self.pairs:
+             if p == pair:
+                 continue
+             optuna_label_candle = self._optuna_label_candle.get(p)
+             optuna_label_candles = self._optuna_label_candles.get(p)
+             if optuna_label_candle is not None and optuna_label_candles is not None:
+                 if (
+                     self._optuna_label_incremented_pairs
+                     and p not in self._optuna_label_incremented_pairs
+                 ):
+                     optuna_label_candles += 1
+                 remaining_candles = optuna_label_candle - optuna_label_candles
+                 if remaining_candles in optuna_label_candle_pool:
+                     optuna_label_candle_pool.remove(remaining_candles)
+         optuna_label_candle = optuna_label_candle_pool.pop()
+         self._optuna_label_candle[pair] = optuna_label_candle
+         self._optuna_label_candle_pool.remove(optuna_label_candle)
+         optuna_label_available_candles = (
+             set(self._optuna_label_candle_pool_full)
+             - set(self._optuna_label_candle_pool)
+             - set(self._optuna_label_candle.values())
+         )
+         if len(optuna_label_available_candles) > 0:
+             self._optuna_label_candle_pool.extend(optuna_label_available_candles)
+             random.shuffle(self._optuna_label_candle_pool)
+     def fit(
+         self, data_dictionary: dict[str, Any], dk: FreqaiDataKitchen, **kwargs
+     ) -> Any:
+         """
+         User sets up the training and test data to fit their desired model here
+         :param data_dictionary: the dictionary constructed by DataHandler to hold
+                                 all the training and test data/labels.
+         :param dk: the FreqaiDataKitchen object
+         """
+         X = data_dictionary.get("train_features")
+         y = data_dictionary.get("train_labels")
+         train_weights = data_dictionary.get("train_weights")
+         X_test = data_dictionary.get("test_features")
+         y_test = data_dictionary.get("test_labels")
+         test_weights = data_dictionary.get("test_weights")
+         model_training_parameters = self.model_training_parameters
+         start = time.time()
+         if self._optuna_hyperopt:
+             self.optuna_optimize(
+                 pair=dk.pair,
+                 namespace="hp",
+                 objective=lambda trial: hp_objective(
+                     trial,
+                     self.freqai_info.get("regressor", "xgboost"),
+                     X,
+                     y,
+                     train_weights,
+                     X_test,
+                     y_test,
+                     test_weights,
+                     self.get_optuna_params(dk.pair, "hp"),
+                     model_training_parameters,
+                     self._optuna_config.get("expansion_factor"),
+                 ),
+                 direction=optuna.study.StudyDirection.MINIMIZE,
+             )
+             optuna_hp_params = self.get_optuna_params(dk.pair, "hp")
+             if optuna_hp_params:
+                 model_training_parameters = {
+                     **model_training_parameters,
+                     **optuna_hp_params,
+                 }
+             self.optuna_optimize(
+                 pair=dk.pair,
+                 namespace="train",
+                 objective=lambda trial: train_objective(
+                     trial,
+                     self.freqai_info.get("regressor", "xgboost"),
+                     X,
+                     y,
+                     train_weights,
+                     X_test,
+                     y_test,
+                     test_weights,
+                     self.data_split_parameters.get("test_size", TEST_SIZE),
+                     self.freqai_info.get("fit_live_predictions_candles", 100),
+                     self._optuna_config.get("train_candles_step"),
+                     model_training_parameters,
+                 ),
+                 direction=optuna.study.StudyDirection.MINIMIZE,
+             )
+             optuna_train_params = self.get_optuna_params(dk.pair, "train")
+             if optuna_train_params:
+                 value: float = optuna_train_params.get("value")
+                 if isinstance(value, float) and np.isfinite(value):
+                     train_window = optuna_train_params.get("train_period_candles")
+                     if isinstance(train_window, int) and train_window > 0:
+                         X = X.iloc[-train_window:]
+                         y = y.iloc[-train_window:]
+                         train_weights = train_weights[-train_window:]
+                     test_window = optuna_train_params.get("test_period_candles")
+                     if isinstance(test_window, int) and test_window > 0:
+                         X_test = X_test.iloc[-test_window:]
+                         y_test = y_test.iloc[-test_window:]
+                         test_weights = test_weights[-test_window:]
+         eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights)
+         model = fit_regressor(
+             regressor=self.freqai_info.get("regressor", "xgboost"),
+             X=X,
+             y=y,
+             train_weights=train_weights,
+             eval_set=eval_set,
+             eval_weights=eval_weights,
+             model_training_parameters=model_training_parameters,
+             init_model=self.get_init_model(dk.pair),
+         )
+         time_spent = time.time() - start
+         self.dd.update_metric_tracker("fit_time", time_spent, dk.pair)
+         return model
+     def optuna_throttle_callback(
+         self,
+         pair: str,
+         namespace: str,
+         callback: Callable[[], None],
+     ) -> None:
+         if namespace != "label":
+             raise ValueError(f"Invalid namespace: {namespace}")
+         self._optuna_label_candles[pair] += 1
+         if pair not in self._optuna_label_incremented_pairs:
+             self._optuna_label_incremented_pairs.append(pair)
+         optuna_label_remaining_candles = self._optuna_label_candle.get(
+             pair
+         ) - self._optuna_label_candles.get(pair)
+         if optuna_label_remaining_candles <= 0:
+             try:
+                 callback()
+             except Exception as e:
+                 logger.error(
+                     f"Error executing optuna {pair} {namespace} callback: {str(e)}",
+                     exc_info=True,
+                 )
+             finally:
+                 self.set_optuna_label_candle(pair)
+                 self._optuna_label_candles[pair] = 0
+         else:
+             logger.info(
+                 f"Optuna {pair} {namespace} callback throttled, still {optuna_label_remaining_candles} candles to go"
+             )
+         if len(self._optuna_label_incremented_pairs) >= len(self.pairs):
+             self._optuna_label_incremented_pairs = []
+     def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
+         warmed_up = True
+         fit_live_predictions_candles = self.freqai_info.get(
+             "fit_live_predictions_candles", 100
+         )
+         if self._optuna_hyperopt:
+             self.optuna_throttle_callback(
+                 pair=pair,
+                 namespace="label",
+                 callback=lambda: self.optuna_optimize(
+                     pair=pair,
+                     namespace="label",
+                     objective=lambda trial: label_objective(
+                         trial,
+                         self.data_provider.get_pair_dataframe(
+                             pair=pair, timeframe=self.config.get("timeframe")
+                         ),
+                         fit_live_predictions_candles,
+                         self._optuna_config.get("label_candles_step"),
+                     ),
+                     directions=[
+                         optuna.study.StudyDirection.MAXIMIZE,
+                         optuna.study.StudyDirection.MAXIMIZE,
+                     ],
+                 ),
+             )
+         if self.live:
+             if not hasattr(self, "exchange_candles"):
+                 self.exchange_candles = len(self.dd.model_return_values[pair].index)
+             candles_diff = len(self.dd.historic_predictions[pair].index) - (
+                 fit_live_predictions_candles + self.exchange_candles
+             )
+             if candles_diff < 0:
+                 logger.warning(
+                     f"{pair}: fit live predictions not warmed up yet, still {abs(candles_diff)} candles to go"
+                 )
+                 warmed_up = False
+         pred_df_full = (
+             self.dd.historic_predictions[pair]
+             .iloc[-fit_live_predictions_candles:]
+             .reset_index(drop=True)
+         )
+         if not warmed_up:
+             dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = -2
+             dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = 2
+         else:
+             min_pred, max_pred = self.min_max_pred(
+                 pred_df_full,
+                 fit_live_predictions_candles,
+                 self.get_optuna_params(pair, "label").get("label_period_candles"),
+             )
+             dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = min_pred
+             dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = max_pred
+         dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
+         for label in dk.label_list + dk.unique_class_list:
+             pred_df_full_label = pred_df_full.get(label)
+             if pred_df_full_label is None or pred_df_full_label.dtype == object:
+                 continue
+             if not warmed_up:
+                 f = [0, 0]
+             else:
+                 f = sp.stats.norm.fit(pred_df_full_label)
+             dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
+         di_values = pred_df_full.get("DI_values")
+         # fit the DI_threshold
+         if not warmed_up:
+             f = [0, 0, 0]
+             cutoff = 2
+         else:
+             f = sp.stats.weibull_min.fit(
+                 pd.to_numeric(di_values, errors="coerce").dropna()
+             )
+             cutoff = sp.stats.weibull_min.ppf(
+                 self.freqai_info.get("outlier_threshold", 0.999), *f
+             )
+         dk.data["DI_value_mean"] = di_values.mean()
+         dk.data["DI_value_std"] = di_values.std()
+         dk.data["extra_returns_per_train"]["DI_value_param1"] = f[0]
+         dk.data["extra_returns_per_train"]["DI_value_param2"] = f[1]
+         dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2]
+         dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff
+         dk.data["extra_returns_per_train"]["label_period_candles"] = (
+             self.get_optuna_params(pair, "label").get("label_period_candles")
+         )
+         dk.data["extra_returns_per_train"]["label_natr_ratio"] = self.get_optuna_params(
+             pair, "label"
+         ).get("label_natr_ratio")
+         dk.data["extra_returns_per_train"]["hp_rmse"] = self.get_optuna_value(
+             pair, "hp"
+         )
+         dk.data["extra_returns_per_train"]["train_rmse"] = self.get_optuna_value(
+             pair, "train"
+         )
+     def eval_set_and_weights(
+         self, X_test: pd.DataFrame, y_test: pd.DataFrame, test_weights: np.ndarray
+     ) -> tuple[
+         Optional[list[tuple[pd.DataFrame, pd.DataFrame]]], Optional[list[np.ndarray]]
+     ]:
+         if self.data_split_parameters.get("test_size", TEST_SIZE) == 0:
+             eval_set = None
+             eval_weights = None
+         else:
+             eval_set = [(X_test, y_test)]
+             eval_weights = [test_weights]
+         return eval_set, eval_weights
+     def min_max_pred(
+         self,
+         pred_df: pd.DataFrame,
+         fit_live_predictions_candles: int,
+         label_period_candles: int,
+     ) -> tuple[float, float]:
+         temperature = float(
+             self.freqai_info.get("prediction_thresholds_temperature", 300.0)
+         )
+         extrema = pred_df.get(EXTREMA_COLUMN).iloc[
+             -(
+                 max(2, int(fit_live_predictions_candles / label_period_candles))
+                 * label_period_candles
+             ) :
+         ]
+         min_pred = smoothed_min(extrema, temperature=temperature)
+         max_pred = smoothed_max(extrema, temperature=temperature)
+         return min_pred, max_pred
+     def get_multi_objective_study_best_trial(
+         self, namespace: str, study: optuna.study.Study
+     ) -> Optional[optuna.trial.FrozenTrial]:
+         if namespace != "label":
+             raise ValueError(f"Invalid namespace: {namespace}")
+         n_objectives = len(study.directions)
+         if n_objectives < 2:
+             raise ValueError(
+                 f"Multi-objective study must have at least 2 objectives, but got {n_objectives}"
+             )
+         if not QuickAdapterRegressorV3.optuna_study_has_best_trials(study):
+             return None
+         metrics = {
+             "braycurtis",
+             "canberra",
+             "chebyshev",
+             "cityblock",
+             "correlation",
+             "cosine",
+             "dice",
+             "euclidean",
+             "hamming",
+             "jaccard",
+             "jensenshannon",
+             "kulczynski1",
+             "mahalanobis",
+             "matching",
+             "minkowski",
+             "rogerstanimoto",
+             "russellrao",
+             "seuclidean",
+             "sokalmichener",
+             "sokalsneath",
+             "sqeuclidean",
+             "yule",
+             "hellinger",
+             "shellinger",
+             "geometric_mean",
+             "harmonic_mean",
+             "power_mean",
+             "weighted_sum",
+             "kmeans",
+             "kmeans2",
+             "knn_d1",
+             "knn_d2_mean",
+             "knn_d2_median",
+             "knn_d2_max",
+         }
+         label_metric = self.ft_params.get("label_metric", "seuclidean")
+         if label_metric not in metrics:
+             raise ValueError(
+                 f"Unsupported label metric: {label_metric}. Supported metrics are {', '.join(metrics)}"
+             )
+         best_trials = [
+             trial
+             for trial in study.best_trials
+             if (
+                 trial.values is not None
+                 and len(trial.values) == n_objectives
+                 and all(
+                     isinstance(value, (int, float)) and not np.isnan(value)
+                     for value in trial.values
+                 )
+             )
+         ]
+         if not best_trials:
+             return None
+         def calculate_distances(
+             normalized_matrix: np.ndarray, metric: str
+         ) -> np.ndarray:
+             n_objectives = normalized_matrix.shape[1]
+             n_samples = normalized_matrix.shape[0]
+             label_p_order = float(self.ft_params.get("label_p_order", 2.0))
+             np_weights = np.array(
+                 self.ft_params.get("label_weights", [1.0] * n_objectives)
+             )
+             if np_weights.size != n_objectives:
+                 raise ValueError("label_weights length must match number of objectives")
+             if np.any(np_weights < 0):
+                 raise ValueError("label_weights values must be non-negative")
+             label_weights_sum = np.sum(np_weights)
+             if np.isclose(label_weights_sum, 0):
+                 raise ValueError("label_weights sum cannot be zero")
+             np_weights = np_weights / label_weights_sum
+             knn_kwargs = {}
+             label_knn_metric = self.ft_params.get("label_knn_metric", "seuclidean")
+             if label_knn_metric == "minkowski" and isinstance(label_p_order, float):
+                 knn_kwargs["p"] = label_p_order
+             ideal_point = np.ones(n_objectives)
+             ideal_point_2d = ideal_point.reshape(1, -1)
+             if metric in {
+                 "braycurtis",
+                 "canberra",
+                 "chebyshev",
+                 "cityblock",
+                 "correlation",
+                 "cosine",
+                 "dice",
+                 "euclidean",
+                 "hamming",
+                 "jaccard",
+                 "jensenshannon",
+                 "kulczynski1",  # deprecated since version 1.15.0
+                 "mahalanobis",
+                 "matching",
+                 "minkowski",
+                 "rogerstanimoto",
+                 "russellrao",
+                 "seuclidean",
+                 "sokalmichener",  # deprecated since version 1.15.0
+                 "sokalsneath",
+                 "sqeuclidean",
+                 "yule",
+             }:
+                 cdist_kwargs = {"w": np_weights}
+                 if metric in {
+                     "jensenshannon",
+                     "mahalanobis",
+                     "seuclidean",
+                 }:
+                     del cdist_kwargs["w"]
+                 if metric == "minkowski" and isinstance(label_p_order, float):
+                     cdist_kwargs["p"] = label_p_order
+                 return sp.spatial.distance.cdist(
+                     normalized_matrix,
+                     ideal_point_2d,
+                     metric=metric,
+                     **cdist_kwargs,
+                 ).flatten()
+             elif metric in {"hellinger", "shellinger"}:
+                 np_sqrt_normalized_matrix = np.sqrt(normalized_matrix)
+                 if metric == "shellinger":
+                     np_weights = 1 / np.var(np_sqrt_normalized_matrix, axis=0, ddof=1)
+                 return np.sqrt(
+                     np.sum(
+                         np_weights
+                         * (np_sqrt_normalized_matrix - np.sqrt(ideal_point)) ** 2,
+                         axis=1,
+                     )
+                 ) / np.sqrt(2.0)
+             elif metric in {"geometric_mean", "harmonic_mean", "power_mean"}:
+                 p = {
+                     "geometric_mean": 0.0,
+                     "harmonic_mean": -1.0,
+                     "power_mean": label_p_order,
+                 }[metric]
+                 return sp.stats.pmean(
+                     ideal_point, p=p, weights=np_weights
+                 ) - sp.stats.pmean(normalized_matrix, p=p, weights=np_weights, axis=1)
+             elif metric == "weighted_sum":
+                 return np.sum(np_weights * (ideal_point - normalized_matrix), axis=1)
+             elif metric in {"kmeans", "kmeans2"}:
+                 if n_samples < 2:
+                     return np.full(n_samples, np.inf)
+                 n_clusters = min(max(2, int(np.sqrt(n_samples / 2))), 10, n_samples)
+                 if metric == "kmeans":
+                     kmeans = sklearn.cluster.KMeans(
+                         n_clusters=n_clusters, random_state=42, n_init=10
+                     )
+                     cluster_labels = kmeans.fit_predict(normalized_matrix)
+                     cluster_centers = kmeans.cluster_centers_
+                 elif metric == "kmeans2":
+                     cluster_centers, cluster_labels = sp.cluster.vq.kmeans2(
+                         normalized_matrix, n_clusters, rng=42, minit="++"
+                     )
+                 label_kmeans_metric = self.ft_params.get(
+                     "label_kmeans_metric", "seuclidean"
+                 )
+                 cdist_kwargs = {}
+                 if label_kmeans_metric == "minkowski" and isinstance(
+                     label_p_order, float
+                 ):
+                     cdist_kwargs["p"] = label_p_order
+                 cluster_distances_to_ideal = sp.spatial.distance.cdist(
+                     cluster_centers,
+                     ideal_point_2d,
+                     metric=label_kmeans_metric,
+                     **cdist_kwargs,
+                 ).flatten()
+                 return cluster_distances_to_ideal[cluster_labels]
+             elif metric == "knn_d1":
+                 if n_samples < 2:
+                     return np.full(n_samples, np.inf)
+                 nbrs = sklearn.neighbors.NearestNeighbors(
+                     n_neighbors=2, metric=label_knn_metric, **knn_kwargs
+                 ).fit(normalized_matrix)
+                 distances, _ = nbrs.kneighbors(normalized_matrix)
+                 return distances[:, 1]
+             elif metric in {"knn_d2_mean", "knn_d2_median", "knn_d2_max"}:
+                 if n_samples < 2:
+                     return np.full(n_samples, np.inf)
+                 n_neighbors = (
+                     min(
+                         int(self.ft_params.get("label_knn_d2_n_neighbors", 4)),
+                         n_samples - 1,
+                     )
+                     + 1
+                 )
+                 nbrs = sklearn.neighbors.NearestNeighbors(
+                     n_neighbors=n_neighbors, metric=label_knn_metric, **knn_kwargs
+                 ).fit(normalized_matrix)
+                 distances, _ = nbrs.kneighbors(normalized_matrix)
+                 if metric == "knn_d2_mean":
+                     return np.mean(distances[:, 1:], axis=1)
+                 elif metric == "knn_d2_median":
+                     return np.median(distances[:, 1:], axis=1)
+                 elif metric == "knn_d2_max":
+                     return np.max(distances[:, 1:], axis=1)
+             else:
+                 raise ValueError(
+                     f"Unsupported label metric: {metric}. Supported metrics are {', '.join(metrics)}"
+                 )
+         objective_values_matrix = np.array([trial.values for trial in best_trials])
+         normalized_matrix = np.zeros_like(objective_values_matrix, dtype=float)
+         for i in range(objective_values_matrix.shape[1]):
+             current_column = objective_values_matrix[:, i]
+             current_direction = study.directions[i]
+             is_neg_inf_mask = np.isneginf(current_column)
+             is_pos_inf_mask = np.isposinf(current_column)
+             if current_direction == optuna.study.StudyDirection.MAXIMIZE:
+                 normalized_matrix[is_neg_inf_mask, i] = 0.0
+                 normalized_matrix[is_pos_inf_mask, i] = 1.0
+             else:
+                 normalized_matrix[is_neg_inf_mask, i] = 1.0
+                 normalized_matrix[is_pos_inf_mask, i] = 0.0
+             is_finite_mask = np.isfinite(current_column)
+             if np.any(is_finite_mask):
+                 finite_col = current_column[is_finite_mask]
+                 finite_min_val = np.min(finite_col)
+                 finite_max_val = np.max(finite_col)
+                 finite_range_val = finite_max_val - finite_min_val
+                 if np.isclose(finite_range_val, 0):
+                     if np.any(is_pos_inf_mask) and np.any(is_neg_inf_mask):
+                         normalized_matrix[is_finite_mask, i] = 0.5
+                     elif np.any(is_pos_inf_mask):
+                         normalized_matrix[is_finite_mask, i] = (
+                             0.0
+                             if current_direction == optuna.study.StudyDirection.MAXIMIZE
+                             else 1.0
+                         )
+                     elif np.any(is_neg_inf_mask):
+                         normalized_matrix[is_finite_mask, i] = (
+                             1.0
+                             if current_direction == optuna.study.StudyDirection.MAXIMIZE
+                             else 0.0
+                         )
+                     else:
+                         normalized_matrix[is_finite_mask, i] = 0.5
+                 else:
+                     if current_direction == optuna.study.StudyDirection.MAXIMIZE:
+                         normalized_matrix[is_finite_mask, i] = (
+                             finite_col - finite_min_val
+                         ) / finite_range_val
+                     else:
+                         normalized_matrix[is_finite_mask, i] = (
+                             finite_max_val - finite_col
+                         ) / finite_range_val
+         trial_distances = calculate_distances(normalized_matrix, metric=label_metric)
+         return best_trials[np.argmin(trial_distances)]
+     def optuna_optimize(
+         self,
+         pair: str,
+         namespace: str,
+         objective: Callable[[optuna.trial.Trial], float],
+         direction: Optional[optuna.study.StudyDirection] = None,
+         directions: Optional[list[optuna.study.StudyDirection]] = None,
+     ) -> None:
+         is_study_single_objective = direction is not None and directions is None
+         if not is_study_single_objective and len(directions) < 2:
+             raise ValueError(
+                 "Multi-objective study must have at least 2 directions specified"
+             )
+         study = self.optuna_create_study(
+             pair=pair,
+             namespace=namespace,
+             direction=direction,
+             directions=directions,
+         )
+         if not study:
+             return
+         if self._optuna_config.get("warm_start"):
+             self.optuna_enqueue_previous_best_params(pair, namespace, study)
+         if is_study_single_objective is True:
+             objective_type = "single"
+         else:
+             objective_type = "multi"
+         logger.info(
+             f"Optuna {pair} {namespace} {objective_type} objective hyperopt started"
+         )
+         start_time = time.time()
+         try:
+             study.optimize(
+                 objective,
+                 n_trials=self._optuna_config.get("n_trials"),
+                 n_jobs=self._optuna_config.get("n_jobs"),
+                 timeout=self._optuna_config.get("timeout"),
+                 gc_after_trial=True,
+             )
+         except Exception as e:
+             time_spent = time.time() - start_time
+             logger.error(
+                 f"Optuna {pair} {namespace} {objective_type} objective hyperopt failed ({time_spent:.2f} secs): {str(e)}",
+                 exc_info=True,
+             )
+             return
+         time_spent = time.time() - start_time
+         if is_study_single_objective:
+             if not QuickAdapterRegressorV3.optuna_study_has_best_trial(study):
+                 logger.error(
+                     f"Optuna {pair} {namespace} {objective_type} objective hyperopt failed ({time_spent:.2f} secs): no study best trial found"
+                 )
+                 return
+             self.set_optuna_value(pair, namespace, study.best_value)
+             self.set_optuna_params(pair, namespace, study.best_params)
+             study_results = {
+                 "value": self.get_optuna_value(pair, namespace),
+                 **self.get_optuna_params(pair, namespace),
+             }
+             metric_log_msg = ""
+         else:
+             best_trial = self.get_multi_objective_study_best_trial("label", study)
+             if not best_trial:
+                 logger.error(
+                     f"Optuna {pair} {namespace} {objective_type} objective hyperopt failed ({time_spent:.2f} secs): no study best trial found"
+                 )
+                 return
+             self.set_optuna_values(pair, namespace, best_trial.values)
+             self.set_optuna_params(pair, namespace, best_trial.params)
+             study_results = {
+                 "values": self.get_optuna_values(pair, namespace),
+                 **self.get_optuna_params(pair, namespace),
+             }
+             metric_log_msg = (
+                 f" using {self.ft_params.get('label_metric', 'seuclidean')} metric"
+             )
+         logger.info(
+             f"Optuna {pair} {namespace} {objective_type} objective done{metric_log_msg} ({time_spent:.2f} secs)"
+         )
+         for key, value in study_results.items():
+             logger.info(
+                 f"Optuna {pair} {namespace} {objective_type} objective hyperopt | {key:>20s} : {value}"
+             )
+         self.optuna_save_best_params(pair, namespace)
+     def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage:
+         storage_dir = self.full_path
+         storage_filename = f"optuna-{pair.split('/')[0]}"
+         storage_backend = self._optuna_config.get("storage")
+         if storage_backend == "sqlite":
+             storage = optuna.storages.RDBStorage(
+                 url=f"sqlite:///{storage_dir}/{storage_filename}.sqlite",
+                 heartbeat_interval=60,
+                 failed_trial_callback=optuna.storages.RetryFailedTrialCallback(
+                     max_retry=3
+                 ),
+             )
+         elif storage_backend == "file":
+             storage = optuna.storages.JournalStorage(
+                 optuna.storages.journal.JournalFileBackend(
+                     f"{storage_dir}/{storage_filename}.log"
+                 )
+             )
+         else:
+             raise ValueError(
+                 f"Unsupported optuna storage backend: {storage_backend}. Supported backends are 'sqlite' and 'file'"
+             )
+         return storage
+     def optuna_create_study(
+         self,
+         pair: str,
+         namespace: str,
+         direction: Optional[optuna.study.StudyDirection] = None,
+         directions: Optional[list[optuna.study.StudyDirection]] = None,
+     ) -> Optional[optuna.study.Study]:
+         identifier = self.freqai_info.get("identifier")
+         study_name = f"{identifier}-{pair}-{namespace}"
+         try:
+             storage = self.optuna_storage(pair)
+         except Exception as e:
+             logger.error(
+                 f"Failed to create optuna storage for study {study_name}: {str(e)}",
+                 exc_info=True,
+             )
+             return None
+         continuous = self._optuna_config.get("continuous")
+         if continuous:
+             QuickAdapterRegressorV3.optuna_study_delete(study_name, storage)
+         try:
+             return optuna.create_study(
+                 study_name=study_name,
+                 sampler=optuna.samplers.TPESampler(
+                     n_startup_trials=self._optuna_config.get("n_startup_trials"),
+                     multivariate=True,
+                     group=True,
+                     seed=self._optuna_config.get("seed"),
+                 ),
+                 pruner=optuna.pruners.HyperbandPruner(min_resource=3),
+                 direction=direction,
+                 directions=directions,
+                 storage=storage,
+                 load_if_exists=not continuous,
+             )
+         except Exception as e:
+             logger.error(
+                 f"Failed to create optuna study {study_name}: {str(e)}", exc_info=True
+             )
+             return None
+     def optuna_enqueue_previous_best_params(
+         self, pair: str, namespace: str, study: optuna.study.Study
+     ) -> None:
+         best_params = self.get_optuna_params(pair, namespace)
+         if best_params:
+             study.enqueue_trial(best_params)
+         else:
+             best_params = self.optuna_load_best_params(pair, namespace)
+             if best_params:
+                 study.enqueue_trial(best_params)
+     def optuna_save_best_params(self, pair: str, namespace: str) -> None:
+         best_params_path = Path(
+             self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
+         )
+         try:
+             with best_params_path.open("w", encoding="utf-8") as write_file:
+                 json.dump(self.get_optuna_params(pair, namespace), write_file, indent=4)
+         except Exception as e:
+             logger.error(
+                 f"Failed to save optuna {namespace} best params for {pair}: {str(e)}",
+                 exc_info=True,
+             )
+             raise
+     def optuna_load_best_params(
+         self, pair: str, namespace: str
+     ) -> Optional[dict[str, Any]]:
+         best_params_path = Path(
+             self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
+         )
+         if best_params_path.is_file():
+             with best_params_path.open("r", encoding="utf-8") as read_file:
+                 return json.load(read_file)
+         return None
+     @staticmethod
+     def optuna_study_delete(
+         study_name: str, storage: optuna.storages.BaseStorage
+     ) -> None:
+         try:
+             optuna.delete_study(study_name=study_name, storage=storage)
+         except Exception:
+             pass
+     @staticmethod
+     def optuna_study_load(
+         study_name: str, storage: optuna.storages.BaseStorage
+     ) -> Optional[optuna.study.Study]:
+         try:
+             study = optuna.load_study(study_name=study_name, storage=storage)
+         except Exception:
+             study = None
+         return study
+     @staticmethod
+     def optuna_study_has_best_trial(study: Optional[optuna.study.Study]) -> bool:
+         if study is None:
+             return False
+         try:
+             _ = study.best_trial
+             return True
+         except (ValueError, KeyError):
+             return False
+     @staticmethod
+     def optuna_study_has_best_trials(study: Optional[optuna.study.Study]) -> bool:
+         if study is None:
+             return False
+         try:
+             _ = study.best_trials
+             return True
+         except (ValueError, KeyError):
+             return False
+ regressors = {"xgboost", "lightgbm"}
+ def get_callbacks(trial: optuna.trial.Trial, regressor: str) -> list[Callable]:
+     if regressor == "xgboost":
+         callbacks = [
+             optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
+         ]
+     elif regressor == "lightgbm":
+         callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")]
+     else:
+         raise ValueError(
+             f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+         )
+     return callbacks
+ def fit_regressor(
+     regressor: str,
+     X: pd.DataFrame,
+     y: pd.DataFrame,
+     train_weights: np.ndarray,
+     eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]],
+     eval_weights: Optional[list[np.ndarray]],
+     model_training_parameters: dict[str, Any],
+     init_model: Any = None,
+     callbacks: Optional[list[Callable]] = None,
+ ) -> Any:
+     if regressor == "xgboost":
+         from xgboost import XGBRegressor
+         model = XGBRegressor(
+             objective="reg:squarederror",
+             eval_metric="rmse",
+             callbacks=callbacks,
+             **model_training_parameters,
+         )
+         model.fit(
+             X=X,
+             y=y,
+             sample_weight=train_weights,
+             eval_set=eval_set,
+             sample_weight_eval_set=eval_weights,
+             xgb_model=init_model,
+         )
+     elif regressor == "lightgbm":
+         from lightgbm import LGBMRegressor
+         model = LGBMRegressor(objective="regression", **model_training_parameters)
+         model.fit(
+             X=X,
+             y=y,
+             sample_weight=train_weights,
+             eval_set=eval_set,
+             eval_sample_weight=eval_weights,
+             eval_metric="rmse",
+             init_model=init_model,
+             callbacks=callbacks,
+         )
+     else:
+         raise ValueError(
+             f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+         )
+     return model
+ @lru_cache(maxsize=128)
+ def calculate_min_extrema(
+     size: int, fit_live_predictions_candles: int, min_extrema: int = 4
+ ) -> int:
+     return int(round(size / fit_live_predictions_candles) * min_extrema)
+ def train_objective(
+     trial: optuna.trial.Trial,
+     regressor: str,
+     X: pd.DataFrame,
+     y: pd.DataFrame,
+     train_weights: np.ndarray,
+     X_test: pd.DataFrame,
+     y_test: pd.DataFrame,
+     test_weights: np.ndarray,
+     test_size: float,
+     fit_live_predictions_candles: int,
+     candles_step: int,
+     model_training_parameters: dict[str, Any],
+ ) -> float:
+     test_ok = True
+     test_length = len(X_test)
+     if debug:
+         test_extrema = y_test.get(EXTREMA_COLUMN)
+         n_test_minima: int = sp.signal.find_peaks(-test_extrema)[0].size
+         n_test_maxima: int = sp.signal.find_peaks(test_extrema)[0].size
+         n_test_extrema: int = n_test_minima + n_test_maxima
+         min_test_extrema: int = calculate_min_extrema(
+             test_length, fit_live_predictions_candles
+         )
+         logger.info(
+             f"{test_length=}, {n_test_minima=}, {n_test_maxima=}, {n_test_extrema=}, {min_test_extrema=}"
+         )
+     min_test_window: int = fit_live_predictions_candles * 2
+     if test_length < min_test_window:
+         logger.warning(f"Insufficient test data: {test_length} < {min_test_window}")
+         test_ok = False
+     max_test_window: int = test_length
+     test_window: int = trial.suggest_int(
+         "test_period_candles", min_test_window, max_test_window, step=candles_step
+     )
+     X_test = X_test.iloc[-test_window:]
+     y_test = y_test.iloc[-test_window:]
+     test_extrema = y_test.get(EXTREMA_COLUMN)
+     n_test_minima: int = sp.signal.find_peaks(-test_extrema)[0].size
+     n_test_maxima: int = sp.signal.find_peaks(test_extrema)[0].size
+     n_test_extrema: int = n_test_minima + n_test_maxima
+     min_test_extrema: int = calculate_min_extrema(
+         test_window, fit_live_predictions_candles
+     )
+     if n_test_extrema < min_test_extrema:
+         if debug:
+             logger.warning(
+                 f"Insufficient extrema in test data with {test_window=}: {n_test_extrema=} < {min_test_extrema=}"
+             )
+         test_ok = False
+     test_weights = test_weights[-test_window:]
+     train_ok = True
+     train_length = len(X)
+     if debug:
+         train_extrema = y.get(EXTREMA_COLUMN)
+         n_train_minima: int = sp.signal.find_peaks(-train_extrema)[0].size
+         n_train_maxima: int = sp.signal.find_peaks(train_extrema)[0].size
+         n_train_extrema: int = n_train_minima + n_train_maxima
+         min_train_extrema: int = calculate_min_extrema(
+             train_length, fit_live_predictions_candles
+         )
+         logger.info(
+             f"{train_length=}, {n_train_minima=}, {n_train_maxima=}, {n_train_extrema=}, {min_train_extrema=}"
+         )
+     min_train_window: int = min_test_window * int(round(1 / test_size - 1))
+     if train_length < min_train_window:
+         logger.warning(f"Insufficient train data: {train_length} < {min_train_window}")
+         train_ok = False
+     max_train_window: int = train_length
+     train_window: int = trial.suggest_int(
+         "train_period_candles", min_train_window, max_train_window, step=candles_step
+     )
+     X = X.iloc[-train_window:]
+     y = y.iloc[-train_window:]
+     train_extrema = y.get(EXTREMA_COLUMN)
+     n_train_minima: int = sp.signal.find_peaks(-train_extrema)[0].size
+     n_train_maxima: int = sp.signal.find_peaks(train_extrema)[0].size
+     n_train_extrema: int = n_train_minima + n_train_maxima
+     min_train_extrema: int = calculate_min_extrema(
+         train_window, fit_live_predictions_candles
+     )
+     if n_train_extrema < min_train_extrema:
+         if debug:
+             logger.warning(
+                 f"Insufficient extrema in train data with {train_window=}: {n_train_extrema=} < {min_train_extrema=}"
+             )
+         train_ok = False
+     train_weights = train_weights[-train_window:]
+     if not test_ok or not train_ok:
+         return np.inf
+     model = fit_regressor(
+         regressor=regressor,
+         X=X,
+         y=y,
+         train_weights=train_weights,
+         eval_set=[(X_test, y_test)],
+         eval_weights=[test_weights],
+         model_training_parameters=model_training_parameters,
+         callbacks=get_callbacks(trial, regressor),
+     )
+     y_pred = model.predict(X_test)
+     return sklearn.metrics.root_mean_squared_error(
+         y_test, y_pred, sample_weight=test_weights
+     )
+ def get_optuna_study_model_parameters(
+     trial: optuna.trial.Trial,
+     regressor: str,
+     model_training_best_parameters: dict[str, Any],
+     expansion_factor: float,
+ ) -> dict[str, Any]:
+     if regressor not in regressors:
+         raise ValueError(
+             f"Unsupported regressor model: {regressor} (supported: {', '.join(regressors)})"
+         )
+     default_ranges = {
++        "n_estimators": (100, 1500),
+         "learning_rate": (1e-3, 0.5),
+         "min_child_weight": (1e-8, 100.0),
+         "subsample": (0.5, 1.0),
+         "colsample_bytree": (0.5, 1.0),
+         "reg_alpha": (1e-8, 100.0),
+         "reg_lambda": (1e-8, 100.0),
+         "max_depth": (3, 13),
+         "gamma": (1e-8, 10.0),
+         "num_leaves": (8, 256),
+         "min_split_gain": (1e-8, 10.0),
+         "min_child_samples": (10, 100),
+     }
++    log_scaled_params = {
++        "learning_rate",
++        "min_child_weight",
++        "reg_alpha",
++        "reg_lambda",
++        "gamma",
++        "min_split_gain",
++    }
++
+     ranges = copy.deepcopy(default_ranges)
+     if model_training_best_parameters:
+         for param, (default_min, default_max) in default_ranges.items():
+             center_value = model_training_best_parameters.get(param)
+             if (
+                 center_value is None
+                 or not isinstance(center_value, (int, float))
+                 or not np.isfinite(center_value)
+             ):
+                 continue
++            if param in log_scaled_params:
+                 new_min = center_value / (1 + expansion_factor)
+                 new_max = center_value * (1 + expansion_factor)
+             else:
+                 margin = (default_max - default_min) * expansion_factor / 2
+                 new_min = center_value - margin
+                 new_max = center_value + margin
+             param_min = max(default_min, new_min)
+             param_max = min(default_max, new_max)
+             if param_min < param_max:
+                 ranges[param] = (param_min, param_max)
+     study_model_parameters = {
++        "n_estimators": trial.suggest_int(
++            "n_estimators",
++            int(ranges["n_estimators"][0]),
++            int(ranges["n_estimators"][1]),
++        ),
+         "learning_rate": trial.suggest_float(
+             "learning_rate",
+             ranges["learning_rate"][0],
+             ranges["learning_rate"][1],
+             log=True,
+         ),
+         "min_child_weight": trial.suggest_float(
+             "min_child_weight",
+             ranges["min_child_weight"][0],
+             ranges["min_child_weight"][1],
+             log=True,
+         ),
+         "subsample": trial.suggest_float(
+             "subsample", ranges["subsample"][0], ranges["subsample"][1]
+         ),
+         "colsample_bytree": trial.suggest_float(
+             "colsample_bytree",
+             ranges["colsample_bytree"][0],
+             ranges["colsample_bytree"][1],
+         ),
+         "reg_alpha": trial.suggest_float(
+             "reg_alpha", ranges["reg_alpha"][0], ranges["reg_alpha"][1], log=True
+         ),
+         "reg_lambda": trial.suggest_float(
+             "reg_lambda", ranges["reg_lambda"][0], ranges["reg_lambda"][1], log=True
+         ),
+     }
+     if regressor == "xgboost":
+         study_model_parameters.update(
+             {
+                 "max_depth": trial.suggest_int(
+                     "max_depth",
+                     int(ranges["max_depth"][0]),
+                     int(ranges["max_depth"][1]),
+                 ),
+                 "gamma": trial.suggest_float(
+                     "gamma", ranges["gamma"][0], ranges["gamma"][1], log=True
+                 ),
+             }
+         )
+     elif regressor == "lightgbm":
+         study_model_parameters.update(
+             {
+                 "num_leaves": trial.suggest_int(
+                     "num_leaves",
+                     int(ranges["num_leaves"][0]),
+                     int(ranges["num_leaves"][1]),
+                 ),
+                 "min_split_gain": trial.suggest_float(
+                     "min_split_gain",
+                     ranges["min_split_gain"][0],
+                     ranges["min_split_gain"][1],
+                     log=True,
+                 ),
+                 "min_child_samples": trial.suggest_int(
+                     "min_child_samples",
+                     int(ranges["min_child_samples"][0]),
+                     int(ranges["min_child_samples"][1]),
+                 ),
+             }
+         )
+     return study_model_parameters
+ def hp_objective(
+     trial: optuna.trial.Trial,
+     regressor: str,
+     X: pd.DataFrame,
+     y: pd.DataFrame,
+     train_weights: np.ndarray,
+     X_test: pd.DataFrame,
+     y_test: pd.DataFrame,
+     test_weights: np.ndarray,
+     model_training_best_parameters: dict[str, Any],
+     model_training_parameters: dict[str, Any],
+     expansion_factor: float,
+ ) -> float:
+     study_model_parameters = get_optuna_study_model_parameters(
+         trial, regressor, model_training_best_parameters, expansion_factor
+     )
+     model_training_parameters = {**model_training_parameters, **study_model_parameters}
+     model = fit_regressor(
+         regressor=regressor,
+         X=X,
+         y=y,
+         train_weights=train_weights,
+         eval_set=[(X_test, y_test)],
+         eval_weights=[test_weights],
+         model_training_parameters=model_training_parameters,
+         callbacks=get_callbacks(trial, regressor),
+     )
+     y_pred = model.predict(X_test)
+     return sklearn.metrics.root_mean_squared_error(
+         y_test, y_pred, sample_weight=test_weights
+     )
+ def calculate_quantile(values: np.ndarray, value: float) -> float:
+     if values.size == 0:
+         return np.nan
+     first_value = values[0]
+     if np.all(np.isclose(values, first_value)):
+         return (
+             0.5
+             if np.isclose(value, first_value)
+             else (0.0 if value < first_value else 1.0)
+         )
+     return np.sum(values <= value) / values.size
+ class TrendDirection(IntEnum):
+     NEUTRAL = 0
+     UP = 1
+     DOWN = -1
+ def zigzag(
+     df: pd.DataFrame,
+     natr_period: int = 14,
+     natr_ratio: float = 6.0,
+ ) -> tuple[list[int], list[float], list[TrendDirection], list[float]]:
+     n = len(df)
+     if df.empty or n < natr_period:
+         return [], [], [], []
+     natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
+     indices: list[int] = df.index.tolist()
+     thresholds: np.ndarray = natr_values * natr_ratio
+     closes = df.get("close").to_numpy()
+     highs = df.get("high").to_numpy()
+     lows = df.get("low").to_numpy()
+     state: TrendDirection = TrendDirection.NEUTRAL
+     pivots_indices: list[int] = []
+     pivots_values: list[float] = []
+     pivots_directions: list[TrendDirection] = []
+     pivots_thresholds: list[float] = []
+     last_pivot_pos: int = -1
+     candidate_pivot_pos: int = -1
+     candidate_pivot_value: float = np.nan
+     volatility_quantile_cache: dict[int, float] = {}
+     def calculate_volatility_quantile(pos: int) -> float:
+         if pos not in volatility_quantile_cache:
+             start = max(0, pos + 1 - natr_period)
+             end = min(pos + 1, n)
+             if start >= end:
+                 volatility_quantile_cache[pos] = np.nan
+             else:
+                 volatility_quantile_cache[pos] = calculate_quantile(
+                     natr_values[start:end], natr_values[pos]
+                 )
+         return volatility_quantile_cache[pos]
+     def calculate_slopes_ok_threshold(
+         pos: int,
+         min_threshold: float = 0.65,
+         max_threshold: float = 0.85,
+     ) -> float:
+         volatility_quantile = calculate_volatility_quantile(pos)
+         if np.isnan(volatility_quantile):
+             return median([min_threshold, max_threshold])
+         return min_threshold + (max_threshold - min_threshold) * volatility_quantile
+     def update_candidate_pivot(pos: int, value: float):
+         nonlocal candidate_pivot_pos, candidate_pivot_value
+         if 0 <= pos < n:
+             candidate_pivot_pos = pos
+             candidate_pivot_value = value
+     def reset_candidate_pivot():
+         nonlocal candidate_pivot_pos, candidate_pivot_value
+         candidate_pivot_pos = -1
+         candidate_pivot_value = np.nan
+     def add_pivot(pos: int, value: float, direction: TrendDirection):
+         nonlocal last_pivot_pos
+         if pivots_indices and indices[pos] == pivots_indices[-1]:
+             return
+         pivots_indices.append(indices[pos])
+         pivots_values.append(value)
+         pivots_directions.append(direction)
+         pivots_thresholds.append(thresholds[pos])
+         last_pivot_pos = pos
+         reset_candidate_pivot()
+     slope_ok_cache: dict[tuple[int, int, TrendDirection, float], bool] = {}
+     def get_slope_ok(
+         pos: int,
+         candidate_pivot_pos: int,
+         direction: TrendDirection,
+         min_slope: float,
+     ) -> bool:
+         cache_key = (
+             pos,
+             candidate_pivot_pos,
+             direction,
+             min_slope,
+         )
+         if cache_key in slope_ok_cache:
+             return slope_ok_cache[cache_key]
+         if pos <= candidate_pivot_pos:
+             slope_ok_cache[cache_key] = False
+             return slope_ok_cache[cache_key]
+         log_candidate_pivot_close = np.log(closes[candidate_pivot_pos])
+         log_current_close = np.log(closes[pos])
+         log_slope_close = (log_current_close - log_candidate_pivot_close) / (
+             pos - candidate_pivot_pos
+         )
+         if direction == TrendDirection.UP:
+             slope_ok_cache[cache_key] = log_slope_close > min_slope
+         elif direction == TrendDirection.DOWN:
+             slope_ok_cache[cache_key] = log_slope_close < -min_slope
+         else:
+             slope_ok_cache[cache_key] = False
+         return slope_ok_cache[cache_key]
+     def is_pivot_confirmed(
+         pos: int,
+         candidate_pivot_pos: int,
+         direction: TrendDirection,
+         min_slope: float = np.finfo(float).eps,
+         alpha: float = 0.05,
+     ) -> bool:
+         start_pos = min(candidate_pivot_pos + 1, n)
+         end_pos = min(pos + 1, n)
+         n_slopes = max(0, end_pos - start_pos)
+         if n_slopes < 1:
+             return False
+         slopes_ok: list[bool] = []
+         for i in range(start_pos, end_pos):
+             slopes_ok.append(
+                 get_slope_ok(
+                     pos=i,
+                     candidate_pivot_pos=candidate_pivot_pos,
+                     direction=direction,
+                     min_slope=min_slope,
+                 )
+             )
+         slopes_ok_threshold = calculate_slopes_ok_threshold(candidate_pivot_pos)
+         n_slopes_ok = sum(slopes_ok)
+         binomtest = sp.stats.binomtest(
+             k=n_slopes_ok, n=n_slopes, p=0.5, alternative="greater"
+         )
+         return (
+             binomtest.pvalue <= alpha
+             and (n_slopes_ok / n_slopes) >= slopes_ok_threshold
+         )
+     start_pos = 0
+     initial_high_pos = start_pos
+     initial_low_pos = start_pos
+     initial_high = highs[initial_high_pos]
+     initial_low = lows[initial_low_pos]
+     for i in range(start_pos + 1, n):
+         current_high = highs[i]
+         current_low = lows[i]
+         if current_high > initial_high:
+             initial_high, initial_high_pos = current_high, i
+         if current_low < initial_low:
+             initial_low, initial_low_pos = current_low, i
+         initial_move_from_high = (initial_high - current_low) / initial_high
+         initial_move_from_low = (current_high - initial_low) / initial_low
+         is_initial_high_move_significant = (
+             initial_move_from_high >= thresholds[initial_high_pos]
+         )
+         is_initial_low_move_significant = (
+             initial_move_from_low >= thresholds[initial_low_pos]
+         )
+         if is_initial_high_move_significant and is_initial_low_move_significant:
+             if initial_move_from_high > initial_move_from_low:
+                 add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
+                 state = TrendDirection.DOWN
+                 break
+             else:
+                 add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
+                 state = TrendDirection.UP
+                 break
+         else:
+             if is_initial_high_move_significant:
+                 add_pivot(initial_high_pos, initial_high, TrendDirection.UP)
+                 state = TrendDirection.DOWN
+                 break
+             elif is_initial_low_move_significant:
+                 add_pivot(initial_low_pos, initial_low, TrendDirection.DOWN)
+                 state = TrendDirection.UP
+                 break
+     else:
+         return [], [], [], []
+     for i in range(last_pivot_pos + 1, n):
+         current_high = highs[i]
+         current_low = lows[i]
+         if state == TrendDirection.UP:
+             if np.isnan(candidate_pivot_value) or current_high > candidate_pivot_value:
+                 update_candidate_pivot(i, current_high)
+             if (
+                 candidate_pivot_value - current_low
+             ) / candidate_pivot_value >= thresholds[
+                 candidate_pivot_pos
+             ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.DOWN):
+                 add_pivot(candidate_pivot_pos, candidate_pivot_value, TrendDirection.UP)
+                 state = TrendDirection.DOWN
+         elif state == TrendDirection.DOWN:
+             if np.isnan(candidate_pivot_value) or current_low < candidate_pivot_value:
+                 update_candidate_pivot(i, current_low)
+             if (
+                 current_high - candidate_pivot_value
+             ) / candidate_pivot_value >= thresholds[
+                 candidate_pivot_pos
+             ] and is_pivot_confirmed(i, candidate_pivot_pos, TrendDirection.UP):
+                 add_pivot(
+                     candidate_pivot_pos, candidate_pivot_value, TrendDirection.DOWN
+                 )
+                 state = TrendDirection.UP
+     return pivots_indices, pivots_values, pivots_directions, pivots_thresholds
+ @lru_cache(maxsize=8)
+ def largest_divisor(integer: int, step: int) -> Optional[int]:
+     if not isinstance(integer, int) or integer <= 0:
+         raise ValueError("integer must be a positive integer")
+     if not isinstance(step, int) or step <= 0:
+         raise ValueError("step must be a positive integer")
+     q_start = math.floor(0.5 * step) + 1
+     q_end = math.ceil(1.5 * step) - 1
+     if q_start > q_end:
+         return None
+     for q in range(q_start, q_end + 1):
+         if integer % q == 0:
+             return int(integer / q)
+     return None
+ def label_objective(
+     trial: optuna.trial.Trial,
+     df: pd.DataFrame,
+     fit_live_predictions_candles: int,
+     candles_step: int,
+ ) -> tuple[float, int]:
+     fit_live_predictions_candles_largest_divisor = largest_divisor(
+         fit_live_predictions_candles, candles_step
+     )
+     if fit_live_predictions_candles_largest_divisor is None:
+         raise ValueError(
+             f"Could not find a suitable largest divisor for {fit_live_predictions_candles} with step {candles_step}, please change your fit_live_predictions_candles or candles_step parameters"
+         )
+     min_label_period_candles: int = round_to_nearest_int(
+         max(
+             fit_live_predictions_candles
+             // fit_live_predictions_candles_largest_divisor,
+             candles_step,
+         ),
+         candles_step,
+     )
+     max_label_period_candles: int = round_to_nearest_int(
+         max(fit_live_predictions_candles // 4, min_label_period_candles),
+         candles_step,
+     )
+     label_period_candles = trial.suggest_int(
+         "label_period_candles",
+         min_label_period_candles,
+         max_label_period_candles,
+         step=candles_step,
+     )
+     label_natr_ratio = trial.suggest_float("label_natr_ratio", 2.0, 38.0, step=0.01)
+     df = df.iloc[
+         -(
+             max(2, int(fit_live_predictions_candles / label_period_candles))
+             * label_period_candles
+         ) :
+     ]
+     if df.empty:
+         return -np.inf, -np.inf
+     _, pivots_values, _, pivots_thresholds = zigzag(
+         df,
+         natr_period=label_period_candles,
+         natr_ratio=label_natr_ratio,
+     )
+     return np.median(pivots_thresholds), len(pivots_values)
+ def smoothed_max(series: pd.Series, temperature=1.0) -> float:
+     data_array = series.to_numpy()
+     if data_array.size == 0:
+         return np.nan
+     if temperature < 0:
+         raise ValueError("temperature must be non-negative")
+     if np.isclose(temperature, 0):
+         return data_array.max()
+     return sp.special.logsumexp(temperature * data_array) / temperature
+ def smoothed_min(series: pd.Series, temperature=1.0) -> float:
+     data_array = series.to_numpy()
+     if data_array.size == 0:
+         return np.nan
+     if temperature < 0:
+         raise ValueError("temperature must be non-negative")
+     if np.isclose(temperature, 0):
+         return data_array.min()
+     return -sp.special.logsumexp(-temperature * data_array) / temperature
+ def boltzmann_operator(series: pd.Series, alpha: float) -> float:
+     """
+     Compute the Boltzmann operator of a series with parameter alpha.
+     """
+     data_array = series.to_numpy()
+     if data_array.size == 0:
+         return np.nan
+     if alpha == 0:
+         return np.mean(data_array)
+     scaled_data = alpha * data_array
+     shifted_exponentials = np.exp(scaled_data - np.max(scaled_data))
+     numerator = np.sum(data_array * shifted_exponentials)
+     denominator = np.sum(shifted_exponentials)
+     return numerator / denominator
+ def round_to_nearest_int(value: float, step: int) -> int:
+     """
+     Round a value to the nearest multiple of a given step.
+     :param value: The value to round.
+     :param step: The step size to round to (must be non-zero).
+     :return: The rounded value.
+     :raises ValueError: If step is zero.
+     """
+     if not isinstance(step, int) or step <= 0:
+         raise ValueError("step must be a positive integer")
+     return int(round(value / step) * step)