From 2d480b31d2bb289bd9ec8de9abbfeb2f7f882d07 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 2 Apr 2025 14:49:32 +0200 Subject: [PATCH] refactor(qav3): merge models implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ReforceXY/user_data/freqaimodels/ReforceXY.py | 21 +- quickadapter/user_data/config-template.json | 7 +- .../LightGBMRegressorQuickAdapterV3.py | 685 ------------------ ...dapterV3.py => QuickAdapterRegressorV3.py} | 587 ++++++++------- .../user_data/strategies/QuickAdapterV3.py | 44 +- quickadapter/user_data/strategies/Utils.py | 5 +- 6 files changed, 388 insertions(+), 961 deletions(-) delete mode 100644 quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py rename quickadapter/user_data/freqaimodels/{XGBoostRegressorQuickAdapterV3.py => QuickAdapterRegressorV3.py} (61%) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 654d7d1..10fdc61 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -486,7 +486,7 @@ class ReforceXY(BaseReinforcementLearningModel): output = output.rolling(window=self.CONV_WIDTH).apply(_predict) return output - def get_storage(self, pair: str | None = None) -> BaseStorage | None: + def get_storage(self, pair: Optional[str] = None) -> Optional[BaseStorage]: """ Get the storage for Optuna """ @@ -502,7 +502,7 @@ class ReforceXY(BaseReinforcementLearningModel): return storage @staticmethod - def study_has_best_trial_params(study: Study | None) -> bool: + def study_has_best_trial_params(study: Optional[Study]) -> bool: if study is None: return False try: @@ -517,7 +517,7 @@ class ReforceXY(BaseReinforcementLearningModel): def study( self, train_df: DataFrame, total_timesteps: int, dk: FreqaiDataKitchen - ) -> Dict | None: + ) -> Optional[Dict]: """ Runs hyperparameter optimization using Optuna and returns the best hyperparameters found merged with the user defined parameters @@ -608,7 +608,7 @@ class ReforceXY(BaseReinforcementLearningModel): return {**self.model_training_parameters, **best_trial_params} def save_best_trial_params( - self, best_trial_params: Dict, pair: str | None = None + self, best_trial_params: Dict, pair: Optional[str] = None ) -> None: """ Save the best trial hyperparameters found during hyperparameter optimization @@ -627,10 +627,17 @@ class ReforceXY(BaseReinforcementLearningModel): else f"saving best params to {best_trial_params_path} JSON file" ) logger.info(log_msg) - with best_trial_params_path.open("w", encoding="utf-8") as write_file: - json.dump(best_trial_params, write_file, indent=4) + try: + with best_trial_params_path.open("w", encoding="utf-8") as write_file: + json.dump(best_trial_params, write_file, indent=4) + except Exception as e: + logger.error( + f"Error saving best trial params to {best_trial_params_path}: {e}", + exc_info=True, + ) + raise - def load_best_trial_params(self, pair: str | None = None) -> Dict | None: + def load_best_trial_params(self, pair: Optional[str] = None) -> Optional[Dict]: """ Load the best trial hyperparameters found and saved during hyperparameter optimization """ diff --git a/quickadapter/user_data/config-template.json b/quickadapter/user_data/config-template.json index 4d428c1..fecf86e 100644 --- a/quickadapter/user_data/config-template.json +++ b/quickadapter/user_data/config-template.json @@ -1,8 +1,7 @@ { "$schema": "https://schema.freqtrade.io/schema.json", "strategy": "QuickAdapterV3", - "freqaimodel": "XGBoostRegressorQuickAdapterV3", - // "freqaimodel": "LightGBMRegressorQuickAdapterV3", + "freqaimodel": "QuickAdapterRegressorV3", "max_open_trades": 10, "stake_currency": "USDT", "stake_amount": "unlimited", @@ -105,6 +104,8 @@ }, "freqai": { "enabled": true, + "regressor": "xgboost", + // "regressor": "lightgbm", "conv_width": 1, "purge_old_models": 2, "expiration_hours": 12, @@ -113,7 +114,7 @@ "backtest_period_days": 2, "write_metrics_to_disk": false, "identifier": "quickadapter-xgboost", - // "identifier": "quickadapter-lgbm", + // "identifier": "quickadapter-lightgbm", "fit_live_predictions_candles": 600, "data_kitchen_thread_count": 6, // set to number of CPU threads / 4 "track_performance": false, diff --git a/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py b/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py deleted file mode 100644 index cffc45e..0000000 --- a/quickadapter/user_data/freqaimodels/LightGBMRegressorQuickAdapterV3.py +++ /dev/null @@ -1,685 +0,0 @@ -import logging -import json -from statistics import geometric_mean -from typing import Any -from pathlib import Path - -from lightgbm import LGBMRegressor -import time -from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel -from freqtrade.freqai.data_kitchen import FreqaiDataKitchen -import pandas as pd -import scipy as sp -import optuna -import sklearn -import warnings - -N_TRIALS = 36 -TEST_SIZE = 0.1 - -EXTREMA_COLUMN = "&s-extrema" -MINIMA_THRESHOLD_COLUMN = "&s-minima_threshold" -MAXIMA_THRESHOLD_COLUMN = "&s-maxima_threshold" - -warnings.simplefilter(action="ignore", category=FutureWarning) - -logger = logging.getLogger(__name__) - - -class LightGBMRegressorQuickAdapterV3(BaseRegressionModel): - """ - The following freqaimodel is released to sponsors of the non-profit FreqAI open-source project. - If you find the FreqAI project useful, please consider supporting it by becoming a sponsor. - We use sponsor money to help stimulate new features and to pay for running these public - experiments, with a an objective of helping the community make smarter choices in their - ML journey. - - This freqaimodel is experimental (as with all models released to sponsors). Do *not* expect - returns. The goal is to demonstrate gratitude to people who support the project and to - help them find a good starting point for their own creativity. - - If you have questions, please direct them to our discord: https://discord.gg/xE4RMg4QYw - - https://github.com/sponsors/robcaulk - """ - - version = "3.6.3" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.pairs = self.config.get("exchange", {}).get("pair_whitelist") - if not self.pairs: - raise ValueError( - "FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration" - ) - self.__optuna_config = self.freqai_info.get("optuna_hyperopt", {}) - self.__optuna_hyperopt: bool = ( - self.freqai_info.get("enabled", False) - and self.__optuna_config.get("enabled", False) - and self.data_split_parameters.get("test_size", TEST_SIZE) > 0 - ) - self.__optuna_hp_rmse: dict[str, float] = {} - self.__optuna_period_rmse: dict[str, float] = {} - self.__optuna_hp_params: dict[str, dict] = {} - self.__optuna_period_params: dict[str, dict] = {} - for pair in self.pairs: - self.__optuna_hp_rmse[pair] = -1 - self.__optuna_period_rmse[pair] = -1 - self.__optuna_hp_params[pair] = ( - self.optuna_load_best_params(pair, "hp") or {} - ) - self.__optuna_period_params[pair] = ( - self.optuna_load_best_params(pair, "period") or {} - ) - logger.info( - f"Initialized {self.__class__.__name__} model version {self.version}" - ) - - def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any: - """ - User sets up the training and test data to fit their desired model here - :param data_dictionary: the dictionary constructed by DataHandler to hold - all the training and test data/labels. - """ - - X = data_dictionary["train_features"] - y = data_dictionary["train_labels"] - train_weights = data_dictionary["train_weights"] - - X_test = data_dictionary["test_features"] - y_test = data_dictionary["test_labels"] - test_weights = data_dictionary["test_weights"] - - model_training_parameters = self.model_training_parameters - - init_model = self.get_init_model(dk.pair) - - start = time.time() - if self.__optuna_hyperopt: - optuna_hp_params, optuna_hp_rmse = self.optuna_hp_optimize( - dk.pair, X, y, train_weights, X_test, y_test, test_weights - ) - if optuna_hp_params: - self.__optuna_hp_params[dk.pair] = optuna_hp_params - if optuna_hp_rmse: - self.__optuna_hp_rmse[dk.pair] = optuna_hp_rmse - - if self.__optuna_hp_params.get(dk.pair): - model_training_parameters = { - **model_training_parameters, - **self.__optuna_hp_params[dk.pair], - } - - optuna_period_params, optuna_period_rmse = self.optuna_period_optimize( - dk.pair, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - model_training_parameters, - ) - if optuna_period_params: - self.__optuna_period_params[dk.pair] = optuna_period_params - if optuna_period_rmse: - self.__optuna_period_rmse[dk.pair] = optuna_period_rmse - - if self.__optuna_period_params.get(dk.pair): - train_window = self.__optuna_period_params[dk.pair].get( - "train_period_candles" - ) - X = X.iloc[-train_window:] - y = y.iloc[-train_window:] - train_weights = train_weights[-train_window:] - - test_window = self.__optuna_period_params[dk.pair].get( - "test_period_candles" - ) - X_test = X_test.iloc[-test_window:] - y_test = y_test.iloc[-test_window:] - test_weights = test_weights[-test_window:] - - model = LGBMRegressor(objective="regression", **model_training_parameters) - - eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights) - - model.fit( - X=X, - y=y, - sample_weight=train_weights, - eval_set=eval_set, - eval_sample_weight=eval_weights, - eval_metric="rmse", - init_model=init_model, - ) - time_spent = time.time() - start - self.dd.update_metric_tracker("fit_time", time_spent, dk.pair) - - return model - - def get_label_period_candles(self, pair: str) -> int: - if self.__optuna_period_params.get(pair, {}).get("label_period_candles"): - return self.__optuna_period_params[pair]["label_period_candles"] - return self.ft_params["label_period_candles"] - - def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None: - warmed_up = True - - num_candles = self.freqai_info.get("fit_live_predictions_candles", 100) - if self.live: - if not hasattr(self, "exchange_candles"): - self.exchange_candles = len(self.dd.model_return_values[pair].index) - candle_diff = len(self.dd.historic_predictions[pair].index) - ( - num_candles + self.exchange_candles - ) - if candle_diff < 0: - logger.warning( - f"{pair}: fit live predictions not warmed up yet. Still {abs(candle_diff)} candles to go." - ) - warmed_up = False - - pred_df_full = ( - self.dd.historic_predictions[pair] - .iloc[-num_candles:] - .reset_index(drop=True) - ) - - if not warmed_up: - dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = -2 - dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = 2 - else: - label_period_candles = self.get_label_period_candles(pair) - min_pred, max_pred = self.min_max_pred( - pred_df_full, - num_candles, - label_period_candles, - ) - dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = min_pred - dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = max_pred - - dk.data["labels_mean"], dk.data["labels_std"] = {}, {} - for label in dk.label_list + dk.unique_class_list: - if pred_df_full[label].dtype == object: - continue - if not warmed_up: - f = [0, 0] - else: - f = sp.stats.norm.fit(pred_df_full[label]) - dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1] - - # fit the DI_threshold - if not warmed_up: - f = [0, 0, 0] - cutoff = 2 - else: - di_values = pd.to_numeric(pred_df_full["DI_values"], errors="coerce") - di_values = di_values.dropna() - f = sp.stats.weibull_min.fit(di_values) - cutoff = sp.stats.weibull_min.ppf( - self.freqai_info.get("outlier_threshold", 0.999), *f - ) - - dk.data["DI_value_mean"] = pred_df_full["DI_values"].mean() - dk.data["DI_value_std"] = pred_df_full["DI_values"].std() - dk.data["extra_returns_per_train"]["DI_value_param1"] = f[0] - dk.data["extra_returns_per_train"]["DI_value_param2"] = f[1] - dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2] - dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff - - dk.data["extra_returns_per_train"]["label_period_candles"] = ( - self.get_label_period_candles(pair) - ) - dk.data["extra_returns_per_train"]["hp_rmse"] = self.__optuna_hp_rmse.get( - pair, -1 - ) - dk.data["extra_returns_per_train"]["period_rmse"] = ( - self.__optuna_period_rmse.get(pair, -1) - ) - - def eval_set_and_weights(self, X_test, y_test, test_weights): - if self.data_split_parameters.get("test_size", TEST_SIZE) == 0: - eval_set = None - eval_weights = None - else: - eval_set = [(X_test, y_test)] - eval_weights = [test_weights] - - return eval_set, eval_weights - - def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage | None: - storage_dir = self.full_path - storage_filename = f"optuna-{pair.split('/')[0]}" - storage_backend = self.__optuna_config.get("storage", "file") - if storage_backend == "sqlite": - storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite" - elif storage_backend == "file": - storage = optuna.storages.JournalStorage( - optuna.storages.journal.JournalFileBackend( - f"{storage_dir}/{storage_filename}.log" - ) - ) - return storage - - def min_max_pred( - self, - pred_df: pd.DataFrame, - fit_live_predictions_candles: int, - label_period_candles: int, - ) -> tuple[pd.Series, pd.Series]: - prediction_thresholds_smoothing = self.freqai_info.get( - "prediction_thresholds_smoothing", "quantile" - ) - smoothing_methods: dict = { - "quantile": self.quantile_min_max_pred, - "mean": LightGBMRegressorQuickAdapterV3.mean_min_max_pred, - "median": LightGBMRegressorQuickAdapterV3.median_min_max_pred, - } - return smoothing_methods.get( - prediction_thresholds_smoothing, smoothing_methods["quantile"] - )(pred_df, fit_live_predictions_candles, label_period_candles) - - def optuna_hp_enqueue_previous_best_trial( - self, - pair: str, - study: optuna.study.Study, - ) -> None: - study_namespace = "hp" - if self.__optuna_hp_params.get(pair): - study.enqueue_trial(self.__optuna_hp_params[pair]) - elif self.optuna_load_best_params(pair, study_namespace): - study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace)) - - def optuna_hp_optimize( - self, - pair: str, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - ) -> tuple[dict, float] | tuple[None, None]: - identifier = self.freqai_info.get("identifier", "no_id_provided") - study_namespace = "hp" - study_name = f"{identifier}-{study_namespace}-{pair}" - storage = self.optuna_storage(pair) - pruner = optuna.pruners.HyperbandPruner() - if self.__optuna_config.get("continuous", True): - LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) - study = optuna.create_study( - study_name=study_name, - sampler=optuna.samplers.TPESampler( - multivariate=True, - group=True, - ), - pruner=pruner, - direction=optuna.study.StudyDirection.MINIMIZE, - storage=storage, - load_if_exists=not self.__optuna_config.get("continuous", True), - ) - if self.__optuna_config.get("warm_start", True): - self.optuna_hp_enqueue_previous_best_trial(pair, study) - logger.info(f"Optuna {study_namespace} hyperopt started") - start = time.time() - try: - study.optimize( - lambda trial: hp_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - self.model_training_parameters, - ), - n_trials=self.__optuna_config.get("n_trials", N_TRIALS), - n_jobs=min( - self.__optuna_config.get("n_jobs", 1), - max(int(self.max_system_threads / 4), 1), - ), - timeout=self.__optuna_config.get("timeout", 7200), - gc_after_trial=True, - ) - except Exception as e: - time_spent = time.time() - start - logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}", - exc_info=True, - ) - return None, None - time_spent = time.time() - start - if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: - logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" - ) - return None, None - logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)") - - params = study.best_params - self.optuna_save_best_params(pair, study_namespace, params) - # log params - for key, value in {"rmse": study.best_value, **params}.items(): - logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}") - return params, study.best_value - - def optuna_period_enqueue_previous_best_trial( - self, - pair: str, - study: optuna.study.Study, - ) -> None: - study_namespace = "period" - if self.__optuna_period_params.get(pair): - study.enqueue_trial(self.__optuna_period_params[pair]) - elif self.optuna_load_best_params(pair, study_namespace): - study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace)) - - def optuna_period_optimize( - self, - pair: str, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - model_training_parameters, - ) -> tuple[dict, float] | tuple[None, None]: - identifier = self.freqai_info.get("identifier", "no_id_provided") - study_namespace = "period" - study_name = f"{identifier}-{study_namespace}-{pair}" - storage = self.optuna_storage(pair) - pruner = optuna.pruners.HyperbandPruner() - if self.__optuna_config.get("continuous", True): - LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) - study = optuna.create_study( - study_name=study_name, - sampler=optuna.samplers.TPESampler( - multivariate=True, - group=True, - ), - pruner=pruner, - direction=optuna.study.StudyDirection.MINIMIZE, - storage=storage, - load_if_exists=not self.__optuna_config.get("continuous", True), - ) - if self.__optuna_config.get("warm_start", True): - self.optuna_period_enqueue_previous_best_trial(pair, study) - logger.info(f"Optuna {study_namespace} hyperopt started") - start = time.time() - try: - study.optimize( - lambda trial: period_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - self.freqai_info.get("fit_live_predictions_candles", 100), - self.__optuna_config.get("candles_step", 10), - model_training_parameters, - ), - n_trials=self.__optuna_config.get("n_trials", N_TRIALS), - n_jobs=min( - self.__optuna_config.get("n_jobs", 1), - max(int(self.max_system_threads / 4), 1), - ), - timeout=self.__optuna_config.get("timeout", 7200), - gc_after_trial=True, - ) - except Exception as e: - time_spent = time.time() - start - logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}", - exc_info=True, - ) - return None, None - time_spent = time.time() - start - if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: - logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" - ) - return None, None - logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)") - - params = study.best_params - self.optuna_save_best_params(pair, study_namespace, params) - # log params - for key, value in {"rmse": study.best_value, **params}.items(): - logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}") - return params, study.best_value - - def optuna_save_best_params( - self, pair: str, namespace: str, best_params: dict - ) -> None: - best_params_path = Path( - self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json" - ) - with best_params_path.open("w", encoding="utf-8") as write_file: - json.dump(best_params, write_file, indent=4) - - def optuna_load_best_params(self, pair: str, namespace: str) -> dict | None: - best_params_path = Path( - self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json" - ) - if best_params_path.is_file(): - with best_params_path.open("r", encoding="utf-8") as read_file: - return json.load(read_file) - return None - - @staticmethod - def optuna_study_delete( - study_name: str, storage: optuna.storages.BaseStorage - ) -> None: - try: - optuna.delete_study(study_name=study_name, storage=storage) - except Exception: - pass - - @staticmethod - def optuna_study_load( - study_name: str, storage: optuna.storages.BaseStorage - ) -> optuna.study.Study | None: - try: - study = optuna.load_study(study_name=study_name, storage=storage) - except Exception: - study = None - return study - - @staticmethod - def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool: - if study is None: - return False - try: - _ = study.best_params - return True - # file backend storage raises KeyError - except KeyError: - return False - # sqlite backend storage raises ValueError - except ValueError: - return False - - @staticmethod - def mean_min_max_pred( - pred_df: pd.DataFrame, - fit_live_predictions_candles: int, - label_period_candles: int, - ) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean() - max_pred = pred_df_sorted.iloc[:label_period_frequency].mean() - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - @staticmethod - def median_min_max_pred( - pred_df: pd.DataFrame, - fit_live_predictions_candles: int, - label_period_candles: int, - ) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].median() - max_pred = pred_df_sorted.iloc[:label_period_frequency].median() - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - def quantile_min_max_pred( - self, - pred_df: pd.DataFrame, - fit_live_predictions_candles: int, - label_period_candles: int, - ) -> tuple[pd.Series, pd.Series]: - pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) - .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) - ) - - label_period_frequency: int = int( - fit_live_predictions_candles / (label_period_candles * 2) - ) - q = self.freqai_info.get("quantile", 0.75) - min_pred = pred_df_sorted.iloc[-label_period_frequency:].quantile(1 - q) - max_pred = pred_df_sorted.iloc[:label_period_frequency].quantile(q) - return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] - - -def period_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - fit_live_predictions_candles: int, - candles_step: int, - model_training_parameters, -) -> float: - min_train_window: int = fit_live_predictions_candles * 2 - max_train_window: int = max(len(X), min_train_window) - train_window: int = trial.suggest_int( - "train_period_candles", min_train_window, max_train_window, step=candles_step - ) - X = X.iloc[-train_window:] - y = y.iloc[-train_window:] - train_weights = train_weights[-train_window:] - - min_test_window: int = fit_live_predictions_candles - max_test_window: int = max(len(X_test), min_test_window) - test_window: int = trial.suggest_int( - "test_period_candles", min_test_window, max_test_window, step=candles_step - ) - X_test = X_test.iloc[-test_window:] - y_test = y_test.iloc[-test_window:] - test_weights = test_weights[-test_window:] - - # Fit the model - model = LGBMRegressor(objective="regression", **model_training_parameters) - model.fit( - X=X, - y=y, - sample_weight=train_weights, - eval_set=[(X_test, y_test)], - eval_sample_weight=[test_weights], - eval_metric="rmse", - callbacks=[optuna.integration.LightGBMPruningCallback(trial, "rmse")], - ) - y_pred = model.predict(X_test) - - min_label_period_candles: int = max(fit_live_predictions_candles // 20, 20) - max_label_period_candles: int = min( - max(fit_live_predictions_candles // 6, min_label_period_candles), - test_window // 2, - ) - label_period_candles: int = trial.suggest_int( - "label_period_candles", - min_label_period_candles, - max_label_period_candles, - step=candles_step, - ) - label_window_length: int = label_period_candles * 2 - label_windows_length: int = ( - test_window // label_window_length - ) * label_window_length - if label_windows_length == 0 or label_window_length > test_window: - return float("inf") - y_test_period = [ - y_test.iloc[-label_windows_length:].to_numpy()[i : i + label_window_length] - for i in range(0, label_windows_length, label_window_length) - ] - test_weights_period = [ - test_weights[-label_windows_length:][i : i + label_window_length] - for i in range(0, label_windows_length, label_window_length) - ] - y_pred_period = [ - y_pred[-label_windows_length:][i : i + label_window_length] - for i in range(0, label_windows_length, label_window_length) - ] - - errors = [ - sklearn.metrics.root_mean_squared_error(y_t, y_p, sample_weight=t_w) - for y_t, y_p, t_w in zip(y_test_period, y_pred_period, test_weights_period) - ] - - return geometric_mean(errors) - - -def hp_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - model_training_parameters, -) -> float: - study_parameters = { - "num_leaves": trial.suggest_int("num_leaves", 2, 256), - "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), - "min_child_samples": trial.suggest_int("min_child_samples", 5, 100), - "min_child_weight": trial.suggest_int("min_child_weight", 1, 200), - "subsample": trial.suggest_float("subsample", 0.6, 1.0), - "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), - "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True), - "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True), - } - model_training_parameters = {**model_training_parameters, **study_parameters} - - # Fit the model - model = LGBMRegressor(objective="regression", **model_training_parameters) - model.fit( - X=X, - y=y, - sample_weight=train_weights, - eval_set=[(X_test, y_test)], - eval_sample_weight=[test_weights], - eval_metric="rmse", - callbacks=[optuna.integration.LightGBMPruningCallback(trial, "rmse")], - ) - y_pred = model.predict(X_test) - - error = sklearn.metrics.root_mean_squared_error( - y_test, y_pred, sample_weight=test_weights - ) - - return error diff --git a/quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py similarity index 61% rename from quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py rename to quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 16a1b4e..cd86511 100644 --- a/quickadapter/user_data/freqaimodels/XGBoostRegressorQuickAdapterV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -1,20 +1,21 @@ import logging import json -from statistics import geometric_mean -from typing import Any -from pathlib import Path - -from xgboost import XGBRegressor import time -from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel -from freqtrade.freqai.data_kitchen import FreqaiDataKitchen +import numpy as np import pandas as pd import scipy as sp import optuna import sklearn import warnings -N_TRIALS = 36 +from statistics import geometric_mean +from functools import cached_property +from typing import Any, Callable, Optional +from pathlib import Path +from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel +from freqtrade.freqai.data_kitchen import FreqaiDataKitchen + + TEST_SIZE = 0.1 EXTREMA_COLUMN = "&s-extrema" @@ -26,7 +27,7 @@ warnings.simplefilter(action="ignore", category=FutureWarning) logger = logging.getLogger(__name__) -class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): +class QuickAdapterRegressorV3(BaseRegressionModel): """ The following freqaimodel is released to sponsors of the non-profit FreqAI open-source project. If you find the FreqAI project useful, please consider supporting it by becoming a sponsor. @@ -43,7 +44,26 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): https://github.com/sponsors/robcaulk """ - version = "3.6.3" + version = "3.6.4" + + @cached_property + def __optuna_config(self) -> dict: + return { + **{ + "enabled": False, + "n_jobs": min( + self.freqai_info.get("optuna_hyperopt", {}).get("n_jobs", 1), + max(int(self.max_system_threads / 4), 1), + ), + "storage": "file", + "continuous": True, + "warm_start": True, + "n_trials": 36, + "timeout": 7200, + "candles_step": 10, + }, + **self.freqai_info.get("optuna_hyperopt", {}), + } def __init__(self, **kwargs): super().__init__(**kwargs) @@ -52,10 +72,16 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): raise ValueError( "FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration" ) - self.__optuna_config = self.freqai_info.get("optuna_hyperopt", {}) + if ( + self.freqai_info.get("identifier") is None + or self.freqai_info.get("identifier").strip() == "" + ): + raise ValueError( + "FreqAI model requires identifier defined in the freqai section configuration" + ) self.__optuna_hyperopt: bool = ( self.freqai_info.get("enabled", False) - and self.__optuna_config.get("enabled", False) + and self.__optuna_config.get("enabled") and self.data_split_parameters.get("test_size", TEST_SIZE) > 0 ) self.__optuna_hp_rmse: dict[str, float] = {} @@ -72,7 +98,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): self.optuna_load_best_params(pair, "period") or {} ) logger.info( - f"Initialized {self.__class__.__name__} model version {self.version}" + f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}" ) def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any: @@ -92,17 +118,13 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): model_training_parameters = self.model_training_parameters - xgb_model = self.get_init_model(dk.pair) + init_model = self.get_init_model(dk.pair) start = time.time() if self.__optuna_hyperopt: - optuna_hp_params, optuna_hp_rmse = self.optuna_hp_optimize( + self.optuna_hp_optimize( dk.pair, X, y, train_weights, X_test, y_test, test_weights ) - if optuna_hp_params: - self.__optuna_hp_params[dk.pair] = optuna_hp_params - if optuna_hp_rmse: - self.__optuna_hp_rmse[dk.pair] = optuna_hp_rmse if self.__optuna_hp_params.get(dk.pair): model_training_parameters = { @@ -110,7 +132,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): **self.__optuna_hp_params[dk.pair], } - optuna_period_params, optuna_period_rmse = self.optuna_period_optimize( + self.optuna_period_optimize( dk.pair, X, y, @@ -120,10 +142,6 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): test_weights, model_training_parameters, ) - if optuna_period_params: - self.__optuna_period_params[dk.pair] = optuna_period_params - if optuna_period_rmse: - self.__optuna_period_rmse[dk.pair] = optuna_period_rmse if self.__optuna_period_params.get(dk.pair): train_window = self.__optuna_period_params[dk.pair].get( @@ -140,21 +158,17 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): y_test = y_test.iloc[-test_window:] test_weights = test_weights[-test_window:] - model = XGBRegressor( - objective="reg:squarederror", - eval_metric="rmse", - **model_training_parameters, - ) - eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights) - model.fit( + model = train_regressor( + regressor=self.freqai_info.get("regressor", "xgboost"), X=X, y=y, - sample_weight=train_weights, + train_weights=train_weights, eval_set=eval_set, - sample_weight_eval_set=eval_weights, - xgb_model=xgb_model, + eval_weights=eval_weights, + model_training_parameters=model_training_parameters, + init_model=init_model, ) time_spent = time.time() - start self.dd.update_metric_tracker("fit_time", time_spent, dk.pair) @@ -250,20 +264,6 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): return eval_set, eval_weights - def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage | None: - storage_dir = self.full_path - storage_filename = f"optuna-{pair.split('/')[0]}" - storage_backend = self.__optuna_config.get("storage", "file") - if storage_backend == "sqlite": - storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite" - elif storage_backend == "file": - storage = optuna.storages.JournalStorage( - optuna.storages.journal.JournalFileBackend( - f"{storage_dir}/{storage_filename}.log" - ) - ) - return storage - def min_max_pred( self, pred_df: pd.DataFrame, @@ -275,185 +275,202 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): ) smoothing_methods: dict = { "quantile": self.quantile_min_max_pred, - "mean": XGBoostRegressorQuickAdapterV3.mean_min_max_pred, - "median": XGBoostRegressorQuickAdapterV3.median_min_max_pred, + "mean": QuickAdapterRegressorV3.mean_min_max_pred, + "median": QuickAdapterRegressorV3.median_min_max_pred, } return smoothing_methods.get( prediction_thresholds_smoothing, smoothing_methods["quantile"] )(pred_df, fit_live_predictions_candles, label_period_candles) - def optuna_hp_enqueue_previous_best_trial( + def optuna_hp_optimize( self, pair: str, - study: optuna.study.Study, - ) -> None: - study_namespace = "hp" - if self.__optuna_hp_params.get(pair): - study.enqueue_trial(self.__optuna_hp_params[pair]) - elif self.optuna_load_best_params(pair, study_namespace): - study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace)) + X: pd.DataFrame, + y: pd.DataFrame, + train_weights: np.ndarray, + X_test: pd.DataFrame, + y_test: pd.DataFrame, + test_weights: np.ndarray, + ) -> tuple[dict, float] | tuple[None, None]: + namespace = "hp" + identifier = self.freqai_info["identifier"] + study = self.optuna_create_study(f"{identifier}-{namespace}-{pair}", pair) + if study is None: + return None, None - def optuna_hp_optimize( + if self.__optuna_config.get("warm_start"): + self.optuna_enqueue_previous_best_params(pair, study, namespace) + + def objective(trial: optuna.Trial) -> float: + return hp_objective( + trial, + self.freqai_info.get("regressor", "xgboost"), + X, + y, + train_weights, + X_test, + y_test, + test_weights, + self.model_training_parameters, + ) + + return self.optuna_process_study( + study=study, pair=pair, namespace=namespace, objective=objective + ) + + def optuna_period_optimize( self, pair: str, - X, - y, - train_weights, - X_test, - y_test, - test_weights, + X: pd.DataFrame, + y: pd.DataFrame, + train_weights: np.ndarray, + X_test: pd.DataFrame, + y_test: pd.DataFrame, + test_weights: np.ndarray, + model_training_parameters: dict, ) -> tuple[dict, float] | tuple[None, None]: - identifier = self.freqai_info.get("identifier", "no_id_provided") - study_namespace = "hp" - study_name = f"{identifier}-{study_namespace}-{pair}" - storage = self.optuna_storage(pair) - pruner = optuna.pruners.HyperbandPruner() - if self.__optuna_config.get("continuous", True): - XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) - study = optuna.create_study( - study_name=study_name, - sampler=optuna.samplers.TPESampler( - multivariate=True, - group=True, - ), - pruner=pruner, - direction=optuna.study.StudyDirection.MINIMIZE, - storage=storage, - load_if_exists=not self.__optuna_config.get("continuous", True), + namespace = "period" + identifier = self.freqai_info["identifier"] + study = self.optuna_create_study(f"{identifier}-{namespace}-{pair}", pair) + if study is None: + return None, None + + if self.__optuna_config.get("warm_start"): + self.optuna_enqueue_previous_best_params(pair, study, namespace) + + def objective(trial: optuna.Trial) -> float: + return period_objective( + trial, + self.freqai_info.get("regressor", "xgboost"), + X, + y, + train_weights, + X_test, + y_test, + test_weights, + self.freqai_info.get("fit_live_predictions_candles", 100), + self.__optuna_config.get("candles_step"), + model_training_parameters, + ) + + return self.optuna_process_study( + study=study, pair=pair, namespace=namespace, objective=objective ) - if self.__optuna_config.get("warm_start", True): - self.optuna_hp_enqueue_previous_best_trial(pair, study) - logger.info(f"Optuna {study_namespace} hyperopt started") - start = time.time() + + def optuna_storage(self, pair: str) -> Optional[optuna.storages.BaseStorage]: + storage_dir = self.full_path + storage_filename = f"optuna-{pair.split('/')[0]}" + storage_backend = self.__optuna_config.get("storage") + if storage_backend == "sqlite": + storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite" + elif storage_backend == "file": + storage = optuna.storages.JournalStorage( + optuna.storages.journal.JournalFileBackend( + f"{storage_dir}/{storage_filename}.log" + ) + ) + return storage + + def optuna_create_study( + self, study_name: str, pair: str + ) -> Optional[optuna.study.Study]: + storage = self.optuna_storage(pair) + if storage is None: + logger.error(f"Failed to create optuna storage for {study_name}") + return None + + if self.__optuna_config.get("continuous"): + self.optuna_study_delete(study_name, storage) + try: - study.optimize( - lambda trial: hp_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - self.model_training_parameters, - ), - n_trials=self.__optuna_config.get("n_trials", N_TRIALS), - n_jobs=min( - self.__optuna_config.get("n_jobs", 1), - max(int(self.max_system_threads / 4), 1), - ), - timeout=self.__optuna_config.get("timeout", 7200), - gc_after_trial=True, + return optuna.create_study( + study_name=study_name, + sampler=optuna.samplers.TPESampler(multivariate=True, group=True), + pruner=optuna.pruners.HyperbandPruner(), + direction=optuna.study.StudyDirection.MINIMIZE, + storage=storage, + load_if_exists=not self.__optuna_config.get("continuous"), ) except Exception as e: - time_spent = time.time() - start logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}", - exc_info=True, + f"Failed to create optuna study {study_name}: {str(e)}", exc_info=True ) - return None, None - time_spent = time.time() - start - if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: + return None + + def optuna_enqueue_previous_best_params( + self, pair: str, study: optuna.study.Study, namespace: str + ) -> None: + best_params = getattr( + self, f"_{self.__class__.__name__}__optuna_{namespace}_params" + ).get(pair) + if best_params: + study.enqueue_trial(best_params) + else: + best_params = self.optuna_load_best_params(pair, namespace) + if best_params: + study.enqueue_trial(best_params) + + def optuna_handle_error( + self, namespace: str, start_time: float, e: Exception + ) -> None: + time_spent = time.time() - start_time + logger.error( + f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}", + exc_info=True, + ) + + def optuna_process_results( + self, study: optuna.study.Study, pair: str, namespace: str, start_time: float + ) -> tuple[dict, float] | tuple[None, None]: + time_spent = time.time() - start_time + + if not self.optuna_study_has_best_params(study): logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" + f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" ) return None, None - logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)") params = study.best_params - self.optuna_save_best_params(pair, study_namespace, params) - # log params - for key, value in {"rmse": study.best_value, **params}.items(): - logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}") - return params, study.best_value + rmse = study.best_value - def optuna_period_enqueue_previous_best_trial( - self, - pair: str, - study: optuna.study.Study, - ) -> None: - study_namespace = "period" - if self.__optuna_period_params.get(pair): - study.enqueue_trial(self.__optuna_period_params[pair]) - elif self.optuna_load_best_params(pair, study_namespace): - study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace)) + logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)") + for key, value in {"rmse": rmse, **params}.items(): + logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}") - def optuna_period_optimize( + if namespace == "hp": + self.__optuna_hp_params[pair] = params + self.__optuna_hp_rmse[pair] = rmse + elif namespace == "period": + self.__optuna_period_params[pair] = params + self.__optuna_period_rmse[pair] = rmse + + self.optuna_save_best_params(pair, namespace, params) + + return params, rmse + + def optuna_process_study( self, + study: optuna.study.Study, pair: str, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - model_training_parameters, + namespace: str, + objective: Callable[[optuna.Trial], float], ) -> tuple[dict, float] | tuple[None, None]: - identifier = self.freqai_info.get("identifier", "no_id_provided") - study_namespace = "period" - study_name = f"{identifier}-{study_namespace}-{pair}" - storage = self.optuna_storage(pair) - pruner = optuna.pruners.HyperbandPruner() - if self.__optuna_config.get("continuous", True): - XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage) - study = optuna.create_study( - study_name=study_name, - sampler=optuna.samplers.TPESampler( - multivariate=True, - group=True, - ), - pruner=pruner, - direction=optuna.study.StudyDirection.MINIMIZE, - storage=storage, - load_if_exists=not self.__optuna_config.get("continuous", True), - ) - if self.__optuna_config.get("warm_start", True): - self.optuna_period_enqueue_previous_best_trial(pair, study) - logger.info(f"Optuna {study_namespace} hyperopt started") - start = time.time() + logger.info(f"Optuna {namespace} hyperopt started") + start_time = time.time() + try: study.optimize( - lambda trial: period_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - self.freqai_info.get("fit_live_predictions_candles", 100), - self.__optuna_config.get("candles_step", 10), - model_training_parameters, - ), - n_trials=self.__optuna_config.get("n_trials", N_TRIALS), - n_jobs=min( - self.__optuna_config.get("n_jobs", 1), - max(int(self.max_system_threads / 4), 1), - ), - timeout=self.__optuna_config.get("timeout", 7200), + objective, + n_trials=self.__optuna_config.get("n_trials"), + n_jobs=self.__optuna_config.get("n_jobs"), + timeout=self.__optuna_config.get("timeout"), gc_after_trial=True, ) except Exception as e: - time_spent = time.time() - start - logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}", - exc_info=True, - ) + self.optuna_handle_error(namespace, start_time, e) return None, None - time_spent = time.time() - start - if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False: - logger.error( - f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found" - ) - return None, None - logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)") - params = study.best_params - self.optuna_save_best_params(pair, study_namespace, params) - # log params - for key, value in {"rmse": study.best_value, **params}.items(): - logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}") - return params, study.best_value + return self.optuna_process_results(study, pair, namespace, start_time) def optuna_save_best_params( self, pair: str, namespace: str, best_params: dict @@ -461,10 +478,17 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): best_params_path = Path( self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json" ) - with best_params_path.open("w", encoding="utf-8") as write_file: - json.dump(best_params, write_file, indent=4) + try: + with best_params_path.open("w", encoding="utf-8") as write_file: + json.dump(best_params, write_file, indent=4) + except Exception as e: + logger.error( + f"Failed to save optuna {namespace} best params for {pair}: {str(e)}", + exc_info=True, + ) + raise - def optuna_load_best_params(self, pair: str, namespace: str) -> dict | None: + def optuna_load_best_params(self, pair: str, namespace: str) -> Optional[dict]: best_params_path = Path( self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json" ) @@ -485,7 +509,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): @staticmethod def optuna_study_load( study_name: str, storage: optuna.storages.BaseStorage - ) -> optuna.study.Study | None: + ) -> Optional[optuna.study.Study]: try: study = optuna.load_study(study_name=study_name, storage=storage) except Exception: @@ -493,7 +517,7 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): return study @staticmethod - def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool: + def optuna_study_has_best_params(study: Optional[optuna.study.Study]) -> bool: if study is None: return False try: @@ -513,9 +537,10 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): label_period_candles: int, ) -> tuple[pd.Series, pd.Series]: pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) + pred_df[[EXTREMA_COLUMN]] .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + .sort_values(by=EXTREMA_COLUMN, ascending=False) + .reset_index(drop=True) ) label_period_frequency: int = int( @@ -532,9 +557,10 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): label_period_candles: int, ) -> tuple[pd.Series, pd.Series]: pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) + pred_df[[EXTREMA_COLUMN]] .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + .sort_values(by=EXTREMA_COLUMN, ascending=False) + .reset_index(drop=True) ) label_period_frequency: int = int( @@ -551,9 +577,10 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): label_period_candles: int, ) -> tuple[pd.Series, pd.Series]: pred_df_sorted = ( - pred_df.select_dtypes(exclude=["object"]) + pred_df[[EXTREMA_COLUMN]] .copy() - .apply(lambda col: col.sort_values(ascending=False, ignore_index=True)) + .sort_values(by=EXTREMA_COLUMN, ascending=False) + .reset_index(drop=True) ) label_period_frequency: int = int( @@ -565,17 +592,77 @@ class XGBoostRegressorQuickAdapterV3(BaseRegressionModel): return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN] -def period_objective( - trial, +def get_callbacks(trial: optuna.Trial, regressor: str) -> list: + if regressor == "xgboost": + callbacks = [ + optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse") + ] + elif regressor == "lightgbm": + callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")] + else: + raise ValueError(f"Unsupported regressor model: {regressor}") + return callbacks + + +def train_regressor( + regressor: str, X, y, train_weights, - X_test, - y_test, - test_weights, + eval_set, + eval_weights, + model_training_parameters: dict, + init_model: Any = None, + callbacks: list = None, +) -> Any: + if regressor == "xgboost": + from xgboost import XGBRegressor + + model = XGBRegressor( + objective="reg:squarederror", + eval_metric="rmse", + callbacks=callbacks, + **model_training_parameters, + ) + model.fit( + X=X, + y=y, + sample_weight=train_weights, + eval_set=eval_set, + sample_weight_eval_set=eval_weights, + xgb_model=init_model, + ) + elif regressor == "lightgbm": + from lightgbm import LGBMRegressor + + model = LGBMRegressor(objective="regression", **model_training_parameters) + model.fit( + X=X, + y=y, + sample_weight=train_weights, + eval_set=eval_set, + eval_sample_weight=eval_weights, + eval_metric="rmse", + init_model=init_model, + callbacks=callbacks, + ) + else: + raise ValueError(f"Unsupported regressor model: {regressor}") + return model + + +def period_objective( + trial: optuna.Trial, + regressor: str, + X: pd.DataFrame, + y: pd.DataFrame, + train_weights: np.ndarray, + X_test: pd.DataFrame, + y_test: pd.DataFrame, + test_weights: np.ndarray, fit_live_predictions_candles: int, candles_step: int, - model_training_parameters, + model_training_parameters: dict, ) -> float: min_train_window: int = fit_live_predictions_candles * 2 max_train_window: int = max(len(X), min_train_window) @@ -595,21 +682,15 @@ def period_objective( y_test = y_test.iloc[-test_window:] test_weights = test_weights[-test_window:] - # Fit the model - model = XGBRegressor( - objective="reg:squarederror", - eval_metric="rmse", - callbacks=[ - optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse") - ], - **model_training_parameters, - ) - model.fit( + model = train_regressor( + regressor=regressor, X=X, y=y, - sample_weight=train_weights, + train_weights=train_weights, eval_set=[(X_test, y_test)], - sample_weight_eval_set=[test_weights], + eval_weights=[test_weights], + model_training_parameters=model_training_parameters, + callbacks=get_callbacks(trial, regressor), ) y_pred = model.predict(X_test) @@ -651,42 +732,54 @@ def period_objective( return geometric_mean(errors) -def hp_objective( - trial, - X, - y, - train_weights, - X_test, - y_test, - test_weights, - model_training_parameters, -) -> float: - study_parameters = { +def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> dict: + study_model_parameters = { "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), - "max_depth": trial.suggest_int("max_depth", 3, 18), "min_child_weight": trial.suggest_int("min_child_weight", 1, 200), "subsample": trial.suggest_float("subsample", 0.6, 1.0), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True), "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True), } - model_training_parameters = {**model_training_parameters, **study_parameters} + if regressor == "xgboost": + study_model_parameters.update( + { + "max_depth": trial.suggest_int("max_depth", 3, 18), + } + ) + elif regressor == "lightgbm": + study_model_parameters.update( + { + "num_leaves": trial.suggest_int("num_leaves", 2, 256), + "min_child_samples": trial.suggest_int("min_child_samples", 5, 100), + } + ) + return study_model_parameters - # Fit the model - model = XGBRegressor( - objective="reg:squarederror", - eval_metric="rmse", - callbacks=[ - optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse") - ], - **model_training_parameters, - ) - model.fit( + +def hp_objective( + trial: optuna.Trial, + regressor: str, + X: pd.DataFrame, + y: pd.DataFrame, + train_weights: np.ndarray, + X_test: pd.DataFrame, + y_test: pd.DataFrame, + test_weights: np.ndarray, + model_training_parameters: dict, +) -> float: + study_model_parameters = get_optuna_study_model_parameters(trial, regressor) + model_training_parameters = {**model_training_parameters, **study_model_parameters} + + model = train_regressor( + regressor=regressor, X=X, y=y, - sample_weight=train_weights, + train_weights=train_weights, eval_set=[(X_test, y_test)], - sample_weight_eval_set=[test_weights], + eval_weights=[test_weights], + model_training_parameters=model_training_parameters, + callbacks=get_callbacks(trial, regressor), ) y_pred = model.predict(X_test) diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index f2a39f5..8fa3dc2 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -1,6 +1,6 @@ import json import logging -from functools import reduce +from functools import reduce, cached_property import datetime import math from pathlib import Path @@ -58,15 +58,16 @@ class QuickAdapterV3(IStrategy): INTERFACE_VERSION = 3 + @cached_property def version(self) -> str: - return "3.2.6" + return "3.2.7" timeframe = "5m" stoploss = -0.02 use_custom_stoploss = True - @property + @cached_property def trailing_stoploss_natr_ratio(self) -> float: return self.config.get("trailing_stoploss_natr_ratio", 0.025) @@ -76,7 +77,7 @@ class QuickAdapterV3(IStrategy): trailing_stop_positive_offset = 0.011 trailing_only_offset_is_reached = True - @property + @cached_property def entry_natr_ratio(self) -> float: return self.config.get("entry_pricing", {}).get("entry_natr_ratio", 0.0025) @@ -84,7 +85,7 @@ class QuickAdapterV3(IStrategy): # reward_risk_ratio = 1.0 means 1:1 RR # reward_risk_ratio = 2.0 means 1:2 RR # ... - @property + @cached_property def reward_risk_ratio(self) -> float: return self.config.get("exit_pricing", {}).get("reward_risk_ratio", 2.0) @@ -105,11 +106,11 @@ class QuickAdapterV3(IStrategy): process_only_new_candles = True - @property + @cached_property def can_short(self) -> bool: return self.is_short_allowed() - @property + @cached_property def plot_config(self) -> dict: return { "main_plot": {}, @@ -130,7 +131,7 @@ class QuickAdapterV3(IStrategy): }, } - @property + @cached_property def protections(self) -> list[dict]: fit_live_predictions_candles = self.freqai_info.get( "fit_live_predictions_candles", 100 @@ -155,7 +156,7 @@ class QuickAdapterV3(IStrategy): use_exit_signal = True - @property + @cached_property def startup_candle_count(self) -> int: # Match the predictions warmup period return self.freqai_info.get("fit_live_predictions_candles", 100) @@ -166,6 +167,13 @@ class QuickAdapterV3(IStrategy): raise ValueError( "FreqAI strategy requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration" ) + if ( + self.freqai_info.get("identifier") is None + or self.freqai_info.get("identifier").strip() == "" + ): + raise ValueError( + "FreqAI strategy requires identifier defined in the freqai section configuration" + ) self.models_full_path = Path( self.config["user_data_dir"] / "models" @@ -414,21 +422,23 @@ class QuickAdapterV3(IStrategy): def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame: return df - def get_trade_entry_candle(self, df: DataFrame, trade: Trade) -> DataFrame | None: + def get_trade_entry_candle( + self, df: DataFrame, trade: Trade + ) -> Optional[DataFrame]: entry_date = timeframe_to_prev_date(self.timeframe, trade.open_date_utc) entry_candle = df.loc[(df["date"] == entry_date)] if entry_candle.empty: return None return entry_candle - def get_trade_entry_natr(self, df: DataFrame, trade: Trade) -> float | None: + def get_trade_entry_natr(self, df: DataFrame, trade: Trade) -> Optional[float]: entry_candle = self.get_trade_entry_candle(df, trade) if entry_candle is None: return None entry_candle = entry_candle.squeeze() return entry_candle["natr_labeling_window"] - def get_trade_duration_candles(self, df: DataFrame, trade: Trade) -> int | None: + def get_trade_duration_candles(self, df: DataFrame, trade: Trade) -> Optional[int]: """ Get the number of candles since the trade entry. :param df: DataFrame with the current data @@ -458,7 +468,7 @@ class QuickAdapterV3(IStrategy): def get_stoploss_distance( self, df: DataFrame, trade: Trade, current_rate: float - ) -> float | None: + ) -> Optional[float]: trade_duration_candles = self.get_trade_duration_candles(df, trade) if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False: return None @@ -472,7 +482,7 @@ class QuickAdapterV3(IStrategy): * (1 / math.log10(1 + 0.25 * trade_duration_candles)) ) - def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> float | None: + def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> Optional[float]: trade_duration_candles = self.get_trade_duration_candles(df, trade) if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False: return None @@ -498,7 +508,7 @@ class QuickAdapterV3(IStrategy): current_rate: float, current_profit: float, **kwargs, - ) -> float | None: + ) -> Optional[float]: df, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) if df.empty: @@ -525,7 +535,7 @@ class QuickAdapterV3(IStrategy): current_rate: float, current_profit: float, **kwargs, - ) -> str | None: + ) -> Optional[str]: df, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) if df.empty: @@ -671,7 +681,7 @@ class QuickAdapterV3(IStrategy): smoothing_methods["gaussian"], ) - def load_period_best_params(self, pair: str) -> dict | None: + def load_period_best_params(self, pair: str) -> Optional[dict]: namespace = "period" best_params_path = Path( self.models_full_path diff --git a/quickadapter/user_data/strategies/Utils.py b/quickadapter/user_data/strategies/Utils.py index 5e3698a..00b8a82 100644 --- a/quickadapter/user_data/strategies/Utils.py +++ b/quickadapter/user_data/strategies/Utils.py @@ -1,6 +1,7 @@ import numpy as np import pandas as pd import talib.abstract as ta +from typing import Callable from scipy.signal import convolve from scipy.signal.windows import gaussian from technical import qtpylib @@ -127,7 +128,7 @@ def zero_lag_series(series: pd.Series, period: int) -> pd.Series: return 2 * series - series.shift(lag) -def get_ma_fn(mamode: str) -> callable: +def get_ma_fn(mamode: str) -> Callable[[pd.Series, int], pd.Series]: mamodes: dict = { "sma": ta.SMA, "ema": ta.EMA, @@ -229,7 +230,7 @@ def smma(series: pd.Series, period: int, zero_lag=False, offset=0) -> pd.Series: return smma -def get_price_fn(pricemode: str) -> callable: +def get_price_fn(pricemode: str) -> Callable[[pd.DataFrame], pd.Series]: pricemodes = { "average": ta.AVGPRICE, "median": ta.MEDPRICE, -- 2.43.0