import sklearn
from datasieve.pipeline import Pipeline
from datasieve.transforms import SKLearnWrapper
+from freqtrade.exceptions import DependencyException
from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from numpy.typing import NDArray
from optuna.study.study import ObjectiveFuncType
+from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import (
MaxAbsScaler,
MinMaxScaler,
https://github.com/sponsors/robcaulk
"""
- version = "3.11.1"
+ version = "3.11.2"
_TEST_SIZE: Final[float] = 0.1
OPTUNA_SPACE_FRACTION_DEFAULT: Final[float] = 0.4
OPTUNA_SEED_DEFAULT: Final[int] = 1
+ _DATA_SPLIT_METHODS: Final[tuple[str, ...]] = (
+ "train_test_split",
+ "timeseries_split",
+ )
+ DATA_SPLIT_METHOD_DEFAULT: Final[str] = _DATA_SPLIT_METHODS[0]
+ TIMESERIES_N_SPLITS_DEFAULT: Final[int] = 5
+ TIMESERIES_GAP_DEFAULT: Final[int] = 0
+ TIMESERIES_MAX_TRAIN_SIZE_DEFAULT: Final[int | None] = None
+
@staticmethod
@lru_cache(maxsize=None)
def _extrema_selection_methods_set() -> set[ExtremaSelectionMethod]:
def _power_mean_metrics_set() -> set[str]:
return set(QuickAdapterRegressorV3._POWER_MEAN_MAP.keys())
+ @staticmethod
+ @lru_cache(maxsize=None)
+ def _data_split_methods_set() -> set[str]:
+ return set(QuickAdapterRegressorV3._DATA_SPLIT_METHODS)
+
@staticmethod
def _get_selection_category(method: str) -> Optional[str]:
for (
logger.info(f"Model Version: {self.version}")
logger.info(f"Regressor: {self.regressor}")
- logger.info("Optuna Hyperopt Configuration:")
+ logger.info("Optuna Hyperopt:")
optuna_config = self._optuna_config
logger.info(f" enabled: {optuna_config.get('enabled')}")
if optuna_config.get("enabled"):
label_pipeline = self.label_pipeline
label_prediction = self.label_prediction
for label_col in LABEL_COLUMNS:
- logger.info(f"Label Configuration [{label_col}]:")
+ logger.info(f"Label [{label_col}]:")
col_pipeline = get_label_column_config(
label_col, label_pipeline["default"], label_pipeline["columns"]
feature_range = self.ft_params.get(
"range", QuickAdapterRegressorV3.RANGE_DEFAULT
)
- logger.info("Feature Parameters Configuration:")
+ logger.info("Feature Parameters:")
logger.info(f" scaler: {scaler}")
logger.info(
f" range: ({format_number(feature_range[0])}, {format_number(feature_range[1])})"
]
)
+ def train(
+ self, unfiltered_df: pd.DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
+ ) -> Any:
+ """
+ Filter the training data and train a model to it.
+
+ Supports two data split methods:
+ - 'train_test_split' (default): Delegates to BaseRegressionModel.train()
+ - 'timeseries_split': Chronological split with configurable gap. Uses the final
+ fold from sklearn's TimeSeriesSplit.
+
+ :param unfiltered_df: Full dataframe for the current training period
+ :param pair: Trading pair being trained
+ :param dk: FreqaiDataKitchen object containing configuration
+ :return: Trained model
+ """
+ method = self.data_split_parameters.get(
+ "method", QuickAdapterRegressorV3.DATA_SPLIT_METHOD_DEFAULT
+ )
+
+ if method not in QuickAdapterRegressorV3._data_split_methods_set():
+ raise ValueError(
+ f"Invalid data_split_parameters.method value {method!r}: "
+ f"supported values are {', '.join(QuickAdapterRegressorV3._DATA_SPLIT_METHODS)}"
+ )
+
+ logger.info(f"Using data split method: {method}")
+
+ if method == QuickAdapterRegressorV3.DATA_SPLIT_METHOD_DEFAULT:
+ return super().train(unfiltered_df, pair, dk, **kwargs)
+
+ elif (
+ method == QuickAdapterRegressorV3._DATA_SPLIT_METHODS[1]
+ ): # timeseries_split
+ logger.info(
+ f"-------------------- Starting training {pair} --------------------"
+ )
+
+ start_time = time.time()
+
+ features_filtered, labels_filtered = dk.filter_features(
+ unfiltered_df,
+ dk.training_features_list,
+ dk.label_list,
+ training_filter=True,
+ )
+
+ start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d")
+ end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d")
+ logger.info(
+ f"-------------------- Training on data from {start_date} to "
+ f"{end_date} --------------------"
+ )
+
+ dd = self._make_timeseries_split_datasets(
+ features_filtered, labels_filtered, dk
+ )
+
+ if (
+ not self.freqai_info.get("fit_live_predictions_candles", 0)
+ or not self.live
+ ):
+ dk.fit_labels()
+
+ dd = self._apply_pipelines(dd, dk, pair)
+
+ logger.info(
+ f"Training model on {len(dd['train_features'].columns)} features"
+ )
+ logger.info(f"Training model on {len(dd['train_features'])} data points")
+
+ model = self.fit(dd, dk, **kwargs)
+
+ end_time = time.time()
+
+ logger.info(
+ f"-------------------- Done training {pair} "
+ f"({end_time - start_time:.2f} secs) --------------------"
+ )
+
+ return model
+
+ def _apply_pipelines(
+ self,
+ dd: dict,
+ dk: FreqaiDataKitchen,
+ pair: str,
+ ) -> dict:
+ """
+ Apply feature and label pipelines to train/test data.
+
+ This helper reduces code duplication between train() methods that need
+ custom data splitting but share the same pipeline application logic.
+
+ :param dd: data_dictionary with train/test features/labels/weights
+ :param dk: FreqaiDataKitchen instance
+ :param pair: Trading pair (for error messages)
+ :return: data_dictionary with transformed features/labels
+ """
+ dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
+ dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count)
+
+ (dd["train_features"], dd["train_labels"], dd["train_weights"]) = (
+ dk.feature_pipeline.fit_transform(
+ dd["train_features"], dd["train_labels"], dd["train_weights"]
+ )
+ )
+
+ dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
+
+ if (
+ self.data_split_parameters.get(
+ "test_size", QuickAdapterRegressorV3._TEST_SIZE
+ )
+ != 0
+ ):
+ if dd["test_labels"].shape[0] == 0:
+ method = self.data_split_parameters.get(
+ "method", QuickAdapterRegressorV3.DATA_SPLIT_METHOD_DEFAULT
+ )
+ if (
+ method == QuickAdapterRegressorV3._DATA_SPLIT_METHODS[1]
+ ): # timeseries_split
+ n_splits = self.data_split_parameters.get(
+ "n_splits", QuickAdapterRegressorV3.TIMESERIES_N_SPLITS_DEFAULT
+ )
+ gap = self.data_split_parameters.get(
+ "gap", QuickAdapterRegressorV3.TIMESERIES_GAP_DEFAULT
+ )
+ max_train_size = self.data_split_parameters.get("max_train_size")
+ test_size = self.data_split_parameters.get("test_size")
+ error_msg = (
+ f"{pair}: test set is empty after filtering. "
+ f"Possible causes: n_splits too high, gap too large, "
+ f"max_train_size too restrictive, or insufficient data. "
+ f"Current parameters: n_splits={n_splits}, gap={gap}, "
+ f"max_train_size={max_train_size}, test_size={test_size}. "
+ f"Try reducing n_splits/gap or increasing data period."
+ )
+ else:
+ test_size = self.data_split_parameters.get(
+ "test_size", QuickAdapterRegressorV3._TEST_SIZE
+ )
+ error_msg = (
+ f"{pair}: test set is empty after filtering. "
+ f"Possible causes: overly strict SVM thresholds or insufficient data. "
+ f"Current test_size={test_size}. "
+ f"Try reducing test_size or relaxing SVM conditions."
+ )
+ raise DependencyException(error_msg)
+ else:
+ (dd["test_features"], dd["test_labels"], dd["test_weights"]) = (
+ dk.feature_pipeline.transform(
+ dd["test_features"], dd["test_labels"], dd["test_weights"]
+ )
+ )
+ dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])
+
+ dk.data_dictionary = dd
+
+ return dd
+
+ def _make_timeseries_split_datasets(
+ self,
+ filtered_dataframe: pd.DataFrame,
+ labels: pd.DataFrame,
+ dk: FreqaiDataKitchen,
+ ) -> dict:
+ """
+ Chronological train/test split using the final fold from sklearn's TimeSeriesSplit.
+
+ n_splits controls train/test proportions (higher = larger train set).
+ gap excludes samples between train/test; when 0, auto-calculated from
+ label_period_candles. max_train_size enables sliding window mode.
+
+ :param filtered_dataframe: Feature data to split
+ :param labels: Label data to split
+ :param dk: FreqaiDataKitchen instance for weight calculation and data building
+ :return: data_dictionary with train/test features/labels/weights
+ """
+ n_splits = int(
+ self.data_split_parameters.get(
+ "n_splits", QuickAdapterRegressorV3.TIMESERIES_N_SPLITS_DEFAULT
+ )
+ )
+ gap = int(
+ self.data_split_parameters.get(
+ "gap", QuickAdapterRegressorV3.TIMESERIES_GAP_DEFAULT
+ )
+ )
+ max_train_size = self.data_split_parameters.get(
+ "max_train_size", QuickAdapterRegressorV3.TIMESERIES_MAX_TRAIN_SIZE_DEFAULT
+ )
+ max_train_size = int(max_train_size) if max_train_size is not None else None
+
+ if n_splits < 2:
+ raise ValueError(
+ f"Invalid data_split_parameters.n_splits value {n_splits!r}: must be >= 2"
+ )
+ if gap < 0:
+ raise ValueError(
+ f"Invalid data_split_parameters.gap value {gap!r}: must be >= 0"
+ )
+ if max_train_size is not None and max_train_size < 1:
+ raise ValueError(
+ f"Invalid data_split_parameters.max_train_size value {max_train_size!r}: "
+ f"must be >= 1 or None"
+ )
+
+ test_size = self.data_split_parameters.get("test_size", None)
+ if test_size is not None:
+ if isinstance(test_size, float) and 0 < test_size < 1:
+ test_size = int(len(filtered_dataframe) * test_size)
+ elif isinstance(test_size, int) and test_size >= 1:
+ pass
+ else:
+ raise ValueError(
+ f"Invalid data_split_parameters.test_size value {test_size!r}: "
+ f"must be float in (0, 1) as fraction, int >= 1 as count, or None"
+ )
+ if test_size < 1:
+ raise ValueError(
+ f"Computed test_size ({test_size}) is too small. "
+ f"Increase test_size or provide more data."
+ )
+
+ if gap == 0:
+ label_period_candles = int(
+ self.ft_params.get("label_period_candles", self._label_defaults[0])
+ )
+ gap = label_period_candles
+ logger.info(
+ f"TimeSeriesSplit gap auto-calculated from label_period_candles: {gap}"
+ )
+
+ tscv = TimeSeriesSplit(
+ n_splits=n_splits,
+ gap=gap,
+ max_train_size=max_train_size,
+ test_size=test_size,
+ )
+ train_idx: np.ndarray = np.array([])
+ test_idx: np.ndarray = np.array([])
+ for train_idx, test_idx in tscv.split(filtered_dataframe):
+ pass
+
+ train_features = filtered_dataframe.iloc[train_idx]
+ test_features = filtered_dataframe.iloc[test_idx]
+ train_labels = labels.iloc[train_idx]
+ test_labels = labels.iloc[test_idx]
+
+ feature_parameters = self.freqai_info.get("feature_parameters", {})
+ if feature_parameters.get("weight_factor", 0) > 0:
+ total_weights = dk.set_weights_higher_recent(len(train_idx) + len(test_idx))
+ train_weights = total_weights[: len(train_idx)]
+ test_weights = total_weights[len(train_idx) :]
+ else:
+ train_weights = np.ones(len(train_idx))
+ test_weights = np.ones(len(test_idx))
+
+ return dk.build_data_dictionary(
+ train_features,
+ test_features,
+ train_labels,
+ test_labels,
+ train_weights,
+ test_weights,
+ )
+
def fit(
self, data_dictionary: dict[str, Any], dk: FreqaiDataKitchen, **kwargs
) -> Any: