output = output.rolling(window=self.CONV_WIDTH).apply(_predict)
return output
- def get_storage(self, pair: str | None = None) -> BaseStorage | None:
+ def get_storage(self, pair: Optional[str] = None) -> Optional[BaseStorage]:
"""
Get the storage for Optuna
"""
return storage
@staticmethod
- def study_has_best_trial_params(study: Study | None) -> bool:
+ def study_has_best_trial_params(study: Optional[Study]) -> bool:
if study is None:
return False
try:
def study(
self, train_df: DataFrame, total_timesteps: int, dk: FreqaiDataKitchen
- ) -> Dict | None:
+ ) -> Optional[Dict]:
"""
Runs hyperparameter optimization using Optuna and
returns the best hyperparameters found merged with the user defined parameters
return {**self.model_training_parameters, **best_trial_params}
def save_best_trial_params(
- self, best_trial_params: Dict, pair: str | None = None
+ self, best_trial_params: Dict, pair: Optional[str] = None
) -> None:
"""
Save the best trial hyperparameters found during hyperparameter optimization
else f"saving best params to {best_trial_params_path} JSON file"
)
logger.info(log_msg)
- with best_trial_params_path.open("w", encoding="utf-8") as write_file:
- json.dump(best_trial_params, write_file, indent=4)
+ try:
+ with best_trial_params_path.open("w", encoding="utf-8") as write_file:
+ json.dump(best_trial_params, write_file, indent=4)
+ except Exception as e:
+ logger.error(
+ f"Error saving best trial params to {best_trial_params_path}: {e}",
+ exc_info=True,
+ )
+ raise
- def load_best_trial_params(self, pair: str | None = None) -> Dict | None:
+ def load_best_trial_params(self, pair: Optional[str] = None) -> Optional[Dict]:
"""
Load the best trial hyperparameters found and saved during hyperparameter optimization
"""
{
"$schema": "https://schema.freqtrade.io/schema.json",
"strategy": "QuickAdapterV3",
- "freqaimodel": "XGBoostRegressorQuickAdapterV3",
- // "freqaimodel": "LightGBMRegressorQuickAdapterV3",
+ "freqaimodel": "QuickAdapterRegressorV3",
"max_open_trades": 10,
"stake_currency": "USDT",
"stake_amount": "unlimited",
},
"freqai": {
"enabled": true,
+ "regressor": "xgboost",
+ // "regressor": "lightgbm",
"conv_width": 1,
"purge_old_models": 2,
"expiration_hours": 12,
"backtest_period_days": 2,
"write_metrics_to_disk": false,
"identifier": "quickadapter-xgboost",
- // "identifier": "quickadapter-lgbm",
+ // "identifier": "quickadapter-lightgbm",
"fit_live_predictions_candles": 600,
"data_kitchen_thread_count": 6, // set to number of CPU threads / 4
"track_performance": false,
+++ /dev/null
-import logging
-import json
-from statistics import geometric_mean
-from typing import Any
-from pathlib import Path
-
-from lightgbm import LGBMRegressor
-import time
-from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
-from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
-import pandas as pd
-import scipy as sp
-import optuna
-import sklearn
-import warnings
-
-N_TRIALS = 36
-TEST_SIZE = 0.1
-
-EXTREMA_COLUMN = "&s-extrema"
-MINIMA_THRESHOLD_COLUMN = "&s-minima_threshold"
-MAXIMA_THRESHOLD_COLUMN = "&s-maxima_threshold"
-
-warnings.simplefilter(action="ignore", category=FutureWarning)
-
-logger = logging.getLogger(__name__)
-
-
-class LightGBMRegressorQuickAdapterV3(BaseRegressionModel):
- """
- The following freqaimodel is released to sponsors of the non-profit FreqAI open-source project.
- If you find the FreqAI project useful, please consider supporting it by becoming a sponsor.
- We use sponsor money to help stimulate new features and to pay for running these public
- experiments, with a an objective of helping the community make smarter choices in their
- ML journey.
-
- This freqaimodel is experimental (as with all models released to sponsors). Do *not* expect
- returns. The goal is to demonstrate gratitude to people who support the project and to
- help them find a good starting point for their own creativity.
-
- If you have questions, please direct them to our discord: https://discord.gg/xE4RMg4QYw
-
- https://github.com/sponsors/robcaulk
- """
-
- version = "3.6.3"
-
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.pairs = self.config.get("exchange", {}).get("pair_whitelist")
- if not self.pairs:
- raise ValueError(
- "FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration"
- )
- self.__optuna_config = self.freqai_info.get("optuna_hyperopt", {})
- self.__optuna_hyperopt: bool = (
- self.freqai_info.get("enabled", False)
- and self.__optuna_config.get("enabled", False)
- and self.data_split_parameters.get("test_size", TEST_SIZE) > 0
- )
- self.__optuna_hp_rmse: dict[str, float] = {}
- self.__optuna_period_rmse: dict[str, float] = {}
- self.__optuna_hp_params: dict[str, dict] = {}
- self.__optuna_period_params: dict[str, dict] = {}
- for pair in self.pairs:
- self.__optuna_hp_rmse[pair] = -1
- self.__optuna_period_rmse[pair] = -1
- self.__optuna_hp_params[pair] = (
- self.optuna_load_best_params(pair, "hp") or {}
- )
- self.__optuna_period_params[pair] = (
- self.optuna_load_best_params(pair, "period") or {}
- )
- logger.info(
- f"Initialized {self.__class__.__name__} model version {self.version}"
- )
-
- def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
- """
- User sets up the training and test data to fit their desired model here
- :param data_dictionary: the dictionary constructed by DataHandler to hold
- all the training and test data/labels.
- """
-
- X = data_dictionary["train_features"]
- y = data_dictionary["train_labels"]
- train_weights = data_dictionary["train_weights"]
-
- X_test = data_dictionary["test_features"]
- y_test = data_dictionary["test_labels"]
- test_weights = data_dictionary["test_weights"]
-
- model_training_parameters = self.model_training_parameters
-
- init_model = self.get_init_model(dk.pair)
-
- start = time.time()
- if self.__optuna_hyperopt:
- optuna_hp_params, optuna_hp_rmse = self.optuna_hp_optimize(
- dk.pair, X, y, train_weights, X_test, y_test, test_weights
- )
- if optuna_hp_params:
- self.__optuna_hp_params[dk.pair] = optuna_hp_params
- if optuna_hp_rmse:
- self.__optuna_hp_rmse[dk.pair] = optuna_hp_rmse
-
- if self.__optuna_hp_params.get(dk.pair):
- model_training_parameters = {
- **model_training_parameters,
- **self.__optuna_hp_params[dk.pair],
- }
-
- optuna_period_params, optuna_period_rmse = self.optuna_period_optimize(
- dk.pair,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- model_training_parameters,
- )
- if optuna_period_params:
- self.__optuna_period_params[dk.pair] = optuna_period_params
- if optuna_period_rmse:
- self.__optuna_period_rmse[dk.pair] = optuna_period_rmse
-
- if self.__optuna_period_params.get(dk.pair):
- train_window = self.__optuna_period_params[dk.pair].get(
- "train_period_candles"
- )
- X = X.iloc[-train_window:]
- y = y.iloc[-train_window:]
- train_weights = train_weights[-train_window:]
-
- test_window = self.__optuna_period_params[dk.pair].get(
- "test_period_candles"
- )
- X_test = X_test.iloc[-test_window:]
- y_test = y_test.iloc[-test_window:]
- test_weights = test_weights[-test_window:]
-
- model = LGBMRegressor(objective="regression", **model_training_parameters)
-
- eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights)
-
- model.fit(
- X=X,
- y=y,
- sample_weight=train_weights,
- eval_set=eval_set,
- eval_sample_weight=eval_weights,
- eval_metric="rmse",
- init_model=init_model,
- )
- time_spent = time.time() - start
- self.dd.update_metric_tracker("fit_time", time_spent, dk.pair)
-
- return model
-
- def get_label_period_candles(self, pair: str) -> int:
- if self.__optuna_period_params.get(pair, {}).get("label_period_candles"):
- return self.__optuna_period_params[pair]["label_period_candles"]
- return self.ft_params["label_period_candles"]
-
- def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
- warmed_up = True
-
- num_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
- if self.live:
- if not hasattr(self, "exchange_candles"):
- self.exchange_candles = len(self.dd.model_return_values[pair].index)
- candle_diff = len(self.dd.historic_predictions[pair].index) - (
- num_candles + self.exchange_candles
- )
- if candle_diff < 0:
- logger.warning(
- f"{pair}: fit live predictions not warmed up yet. Still {abs(candle_diff)} candles to go."
- )
- warmed_up = False
-
- pred_df_full = (
- self.dd.historic_predictions[pair]
- .iloc[-num_candles:]
- .reset_index(drop=True)
- )
-
- if not warmed_up:
- dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = -2
- dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = 2
- else:
- label_period_candles = self.get_label_period_candles(pair)
- min_pred, max_pred = self.min_max_pred(
- pred_df_full,
- num_candles,
- label_period_candles,
- )
- dk.data["extra_returns_per_train"][MINIMA_THRESHOLD_COLUMN] = min_pred
- dk.data["extra_returns_per_train"][MAXIMA_THRESHOLD_COLUMN] = max_pred
-
- dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
- for label in dk.label_list + dk.unique_class_list:
- if pred_df_full[label].dtype == object:
- continue
- if not warmed_up:
- f = [0, 0]
- else:
- f = sp.stats.norm.fit(pred_df_full[label])
- dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
-
- # fit the DI_threshold
- if not warmed_up:
- f = [0, 0, 0]
- cutoff = 2
- else:
- di_values = pd.to_numeric(pred_df_full["DI_values"], errors="coerce")
- di_values = di_values.dropna()
- f = sp.stats.weibull_min.fit(di_values)
- cutoff = sp.stats.weibull_min.ppf(
- self.freqai_info.get("outlier_threshold", 0.999), *f
- )
-
- dk.data["DI_value_mean"] = pred_df_full["DI_values"].mean()
- dk.data["DI_value_std"] = pred_df_full["DI_values"].std()
- dk.data["extra_returns_per_train"]["DI_value_param1"] = f[0]
- dk.data["extra_returns_per_train"]["DI_value_param2"] = f[1]
- dk.data["extra_returns_per_train"]["DI_value_param3"] = f[2]
- dk.data["extra_returns_per_train"]["DI_cutoff"] = cutoff
-
- dk.data["extra_returns_per_train"]["label_period_candles"] = (
- self.get_label_period_candles(pair)
- )
- dk.data["extra_returns_per_train"]["hp_rmse"] = self.__optuna_hp_rmse.get(
- pair, -1
- )
- dk.data["extra_returns_per_train"]["period_rmse"] = (
- self.__optuna_period_rmse.get(pair, -1)
- )
-
- def eval_set_and_weights(self, X_test, y_test, test_weights):
- if self.data_split_parameters.get("test_size", TEST_SIZE) == 0:
- eval_set = None
- eval_weights = None
- else:
- eval_set = [(X_test, y_test)]
- eval_weights = [test_weights]
-
- return eval_set, eval_weights
-
- def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage | None:
- storage_dir = self.full_path
- storage_filename = f"optuna-{pair.split('/')[0]}"
- storage_backend = self.__optuna_config.get("storage", "file")
- if storage_backend == "sqlite":
- storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite"
- elif storage_backend == "file":
- storage = optuna.storages.JournalStorage(
- optuna.storages.journal.JournalFileBackend(
- f"{storage_dir}/{storage_filename}.log"
- )
- )
- return storage
-
- def min_max_pred(
- self,
- pred_df: pd.DataFrame,
- fit_live_predictions_candles: int,
- label_period_candles: int,
- ) -> tuple[pd.Series, pd.Series]:
- prediction_thresholds_smoothing = self.freqai_info.get(
- "prediction_thresholds_smoothing", "quantile"
- )
- smoothing_methods: dict = {
- "quantile": self.quantile_min_max_pred,
- "mean": LightGBMRegressorQuickAdapterV3.mean_min_max_pred,
- "median": LightGBMRegressorQuickAdapterV3.median_min_max_pred,
- }
- return smoothing_methods.get(
- prediction_thresholds_smoothing, smoothing_methods["quantile"]
- )(pred_df, fit_live_predictions_candles, label_period_candles)
-
- def optuna_hp_enqueue_previous_best_trial(
- self,
- pair: str,
- study: optuna.study.Study,
- ) -> None:
- study_namespace = "hp"
- if self.__optuna_hp_params.get(pair):
- study.enqueue_trial(self.__optuna_hp_params[pair])
- elif self.optuna_load_best_params(pair, study_namespace):
- study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace))
-
- def optuna_hp_optimize(
- self,
- pair: str,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- ) -> tuple[dict, float] | tuple[None, None]:
- identifier = self.freqai_info.get("identifier", "no_id_provided")
- study_namespace = "hp"
- study_name = f"{identifier}-{study_namespace}-{pair}"
- storage = self.optuna_storage(pair)
- pruner = optuna.pruners.HyperbandPruner()
- if self.__optuna_config.get("continuous", True):
- LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
- study = optuna.create_study(
- study_name=study_name,
- sampler=optuna.samplers.TPESampler(
- multivariate=True,
- group=True,
- ),
- pruner=pruner,
- direction=optuna.study.StudyDirection.MINIMIZE,
- storage=storage,
- load_if_exists=not self.__optuna_config.get("continuous", True),
- )
- if self.__optuna_config.get("warm_start", True):
- self.optuna_hp_enqueue_previous_best_trial(pair, study)
- logger.info(f"Optuna {study_namespace} hyperopt started")
- start = time.time()
- try:
- study.optimize(
- lambda trial: hp_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.model_training_parameters,
- ),
- n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
- n_jobs=min(
- self.__optuna_config.get("n_jobs", 1),
- max(int(self.max_system_threads / 4), 1),
- ),
- timeout=self.__optuna_config.get("timeout", 7200),
- gc_after_trial=True,
- )
- except Exception as e:
- time_spent = time.time() - start
- logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}",
- exc_info=True,
- )
- return None, None
- time_spent = time.time() - start
- if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
- logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
- )
- return None, None
- logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)")
-
- params = study.best_params
- self.optuna_save_best_params(pair, study_namespace, params)
- # log params
- for key, value in {"rmse": study.best_value, **params}.items():
- logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}")
- return params, study.best_value
-
- def optuna_period_enqueue_previous_best_trial(
- self,
- pair: str,
- study: optuna.study.Study,
- ) -> None:
- study_namespace = "period"
- if self.__optuna_period_params.get(pair):
- study.enqueue_trial(self.__optuna_period_params[pair])
- elif self.optuna_load_best_params(pair, study_namespace):
- study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace))
-
- def optuna_period_optimize(
- self,
- pair: str,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- model_training_parameters,
- ) -> tuple[dict, float] | tuple[None, None]:
- identifier = self.freqai_info.get("identifier", "no_id_provided")
- study_namespace = "period"
- study_name = f"{identifier}-{study_namespace}-{pair}"
- storage = self.optuna_storage(pair)
- pruner = optuna.pruners.HyperbandPruner()
- if self.__optuna_config.get("continuous", True):
- LightGBMRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
- study = optuna.create_study(
- study_name=study_name,
- sampler=optuna.samplers.TPESampler(
- multivariate=True,
- group=True,
- ),
- pruner=pruner,
- direction=optuna.study.StudyDirection.MINIMIZE,
- storage=storage,
- load_if_exists=not self.__optuna_config.get("continuous", True),
- )
- if self.__optuna_config.get("warm_start", True):
- self.optuna_period_enqueue_previous_best_trial(pair, study)
- logger.info(f"Optuna {study_namespace} hyperopt started")
- start = time.time()
- try:
- study.optimize(
- lambda trial: period_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.freqai_info.get("fit_live_predictions_candles", 100),
- self.__optuna_config.get("candles_step", 10),
- model_training_parameters,
- ),
- n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
- n_jobs=min(
- self.__optuna_config.get("n_jobs", 1),
- max(int(self.max_system_threads / 4), 1),
- ),
- timeout=self.__optuna_config.get("timeout", 7200),
- gc_after_trial=True,
- )
- except Exception as e:
- time_spent = time.time() - start
- logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}",
- exc_info=True,
- )
- return None, None
- time_spent = time.time() - start
- if LightGBMRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
- logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
- )
- return None, None
- logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)")
-
- params = study.best_params
- self.optuna_save_best_params(pair, study_namespace, params)
- # log params
- for key, value in {"rmse": study.best_value, **params}.items():
- logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}")
- return params, study.best_value
-
- def optuna_save_best_params(
- self, pair: str, namespace: str, best_params: dict
- ) -> None:
- best_params_path = Path(
- self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
- )
- with best_params_path.open("w", encoding="utf-8") as write_file:
- json.dump(best_params, write_file, indent=4)
-
- def optuna_load_best_params(self, pair: str, namespace: str) -> dict | None:
- best_params_path = Path(
- self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
- )
- if best_params_path.is_file():
- with best_params_path.open("r", encoding="utf-8") as read_file:
- return json.load(read_file)
- return None
-
- @staticmethod
- def optuna_study_delete(
- study_name: str, storage: optuna.storages.BaseStorage
- ) -> None:
- try:
- optuna.delete_study(study_name=study_name, storage=storage)
- except Exception:
- pass
-
- @staticmethod
- def optuna_study_load(
- study_name: str, storage: optuna.storages.BaseStorage
- ) -> optuna.study.Study | None:
- try:
- study = optuna.load_study(study_name=study_name, storage=storage)
- except Exception:
- study = None
- return study
-
- @staticmethod
- def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool:
- if study is None:
- return False
- try:
- _ = study.best_params
- return True
- # file backend storage raises KeyError
- except KeyError:
- return False
- # sqlite backend storage raises ValueError
- except ValueError:
- return False
-
- @staticmethod
- def mean_min_max_pred(
- pred_df: pd.DataFrame,
- fit_live_predictions_candles: int,
- label_period_candles: int,
- ) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
- max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
- @staticmethod
- def median_min_max_pred(
- pred_df: pd.DataFrame,
- fit_live_predictions_candles: int,
- label_period_candles: int,
- ) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
- max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
- def quantile_min_max_pred(
- self,
- pred_df: pd.DataFrame,
- fit_live_predictions_candles: int,
- label_period_candles: int,
- ) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
- .copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
- )
- q = self.freqai_info.get("quantile", 0.75)
- min_pred = pred_df_sorted.iloc[-label_period_frequency:].quantile(1 - q)
- max_pred = pred_df_sorted.iloc[:label_period_frequency].quantile(q)
- return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-
-
-def period_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- fit_live_predictions_candles: int,
- candles_step: int,
- model_training_parameters,
-) -> float:
- min_train_window: int = fit_live_predictions_candles * 2
- max_train_window: int = max(len(X), min_train_window)
- train_window: int = trial.suggest_int(
- "train_period_candles", min_train_window, max_train_window, step=candles_step
- )
- X = X.iloc[-train_window:]
- y = y.iloc[-train_window:]
- train_weights = train_weights[-train_window:]
-
- min_test_window: int = fit_live_predictions_candles
- max_test_window: int = max(len(X_test), min_test_window)
- test_window: int = trial.suggest_int(
- "test_period_candles", min_test_window, max_test_window, step=candles_step
- )
- X_test = X_test.iloc[-test_window:]
- y_test = y_test.iloc[-test_window:]
- test_weights = test_weights[-test_window:]
-
- # Fit the model
- model = LGBMRegressor(objective="regression", **model_training_parameters)
- model.fit(
- X=X,
- y=y,
- sample_weight=train_weights,
- eval_set=[(X_test, y_test)],
- eval_sample_weight=[test_weights],
- eval_metric="rmse",
- callbacks=[optuna.integration.LightGBMPruningCallback(trial, "rmse")],
- )
- y_pred = model.predict(X_test)
-
- min_label_period_candles: int = max(fit_live_predictions_candles // 20, 20)
- max_label_period_candles: int = min(
- max(fit_live_predictions_candles // 6, min_label_period_candles),
- test_window // 2,
- )
- label_period_candles: int = trial.suggest_int(
- "label_period_candles",
- min_label_period_candles,
- max_label_period_candles,
- step=candles_step,
- )
- label_window_length: int = label_period_candles * 2
- label_windows_length: int = (
- test_window // label_window_length
- ) * label_window_length
- if label_windows_length == 0 or label_window_length > test_window:
- return float("inf")
- y_test_period = [
- y_test.iloc[-label_windows_length:].to_numpy()[i : i + label_window_length]
- for i in range(0, label_windows_length, label_window_length)
- ]
- test_weights_period = [
- test_weights[-label_windows_length:][i : i + label_window_length]
- for i in range(0, label_windows_length, label_window_length)
- ]
- y_pred_period = [
- y_pred[-label_windows_length:][i : i + label_window_length]
- for i in range(0, label_windows_length, label_window_length)
- ]
-
- errors = [
- sklearn.metrics.root_mean_squared_error(y_t, y_p, sample_weight=t_w)
- for y_t, y_p, t_w in zip(y_test_period, y_pred_period, test_weights_period)
- ]
-
- return geometric_mean(errors)
-
-
-def hp_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- model_training_parameters,
-) -> float:
- study_parameters = {
- "num_leaves": trial.suggest_int("num_leaves", 2, 256),
- "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
- "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
- "min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
- "subsample": trial.suggest_float("subsample", 0.6, 1.0),
- "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
- "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
- "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
- }
- model_training_parameters = {**model_training_parameters, **study_parameters}
-
- # Fit the model
- model = LGBMRegressor(objective="regression", **model_training_parameters)
- model.fit(
- X=X,
- y=y,
- sample_weight=train_weights,
- eval_set=[(X_test, y_test)],
- eval_sample_weight=[test_weights],
- eval_metric="rmse",
- callbacks=[optuna.integration.LightGBMPruningCallback(trial, "rmse")],
- )
- y_pred = model.predict(X_test)
-
- error = sklearn.metrics.root_mean_squared_error(
- y_test, y_pred, sample_weight=test_weights
- )
-
- return error
import logging
import json
-from statistics import geometric_mean
-from typing import Any
-from pathlib import Path
-
-from xgboost import XGBRegressor
import time
-from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
-from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
+import numpy as np
import pandas as pd
import scipy as sp
import optuna
import sklearn
import warnings
-N_TRIALS = 36
+from statistics import geometric_mean
+from functools import cached_property
+from typing import Any, Callable, Optional
+from pathlib import Path
+from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
+from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
+
+
TEST_SIZE = 0.1
EXTREMA_COLUMN = "&s-extrema"
logger = logging.getLogger(__name__)
-class XGBoostRegressorQuickAdapterV3(BaseRegressionModel):
+class QuickAdapterRegressorV3(BaseRegressionModel):
"""
The following freqaimodel is released to sponsors of the non-profit FreqAI open-source project.
If you find the FreqAI project useful, please consider supporting it by becoming a sponsor.
https://github.com/sponsors/robcaulk
"""
- version = "3.6.3"
+ version = "3.6.4"
+
+ @cached_property
+ def __optuna_config(self) -> dict:
+ return {
+ **{
+ "enabled": False,
+ "n_jobs": min(
+ self.freqai_info.get("optuna_hyperopt", {}).get("n_jobs", 1),
+ max(int(self.max_system_threads / 4), 1),
+ ),
+ "storage": "file",
+ "continuous": True,
+ "warm_start": True,
+ "n_trials": 36,
+ "timeout": 7200,
+ "candles_step": 10,
+ },
+ **self.freqai_info.get("optuna_hyperopt", {}),
+ }
def __init__(self, **kwargs):
super().__init__(**kwargs)
raise ValueError(
"FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration"
)
- self.__optuna_config = self.freqai_info.get("optuna_hyperopt", {})
+ if (
+ self.freqai_info.get("identifier") is None
+ or self.freqai_info.get("identifier").strip() == ""
+ ):
+ raise ValueError(
+ "FreqAI model requires identifier defined in the freqai section configuration"
+ )
self.__optuna_hyperopt: bool = (
self.freqai_info.get("enabled", False)
- and self.__optuna_config.get("enabled", False)
+ and self.__optuna_config.get("enabled")
and self.data_split_parameters.get("test_size", TEST_SIZE) > 0
)
self.__optuna_hp_rmse: dict[str, float] = {}
self.optuna_load_best_params(pair, "period") or {}
)
logger.info(
- f"Initialized {self.__class__.__name__} model version {self.version}"
+ f"Initialized {self.__class__.__name__} {self.freqai_info.get('regressor', 'xgboost')} regressor model version {self.version}"
)
def fit(self, data_dictionary: dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
model_training_parameters = self.model_training_parameters
- xgb_model = self.get_init_model(dk.pair)
+ init_model = self.get_init_model(dk.pair)
start = time.time()
if self.__optuna_hyperopt:
- optuna_hp_params, optuna_hp_rmse = self.optuna_hp_optimize(
+ self.optuna_hp_optimize(
dk.pair, X, y, train_weights, X_test, y_test, test_weights
)
- if optuna_hp_params:
- self.__optuna_hp_params[dk.pair] = optuna_hp_params
- if optuna_hp_rmse:
- self.__optuna_hp_rmse[dk.pair] = optuna_hp_rmse
if self.__optuna_hp_params.get(dk.pair):
model_training_parameters = {
**self.__optuna_hp_params[dk.pair],
}
- optuna_period_params, optuna_period_rmse = self.optuna_period_optimize(
+ self.optuna_period_optimize(
dk.pair,
X,
y,
test_weights,
model_training_parameters,
)
- if optuna_period_params:
- self.__optuna_period_params[dk.pair] = optuna_period_params
- if optuna_period_rmse:
- self.__optuna_period_rmse[dk.pair] = optuna_period_rmse
if self.__optuna_period_params.get(dk.pair):
train_window = self.__optuna_period_params[dk.pair].get(
y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
- model = XGBRegressor(
- objective="reg:squarederror",
- eval_metric="rmse",
- **model_training_parameters,
- )
-
eval_set, eval_weights = self.eval_set_and_weights(X_test, y_test, test_weights)
- model.fit(
+ model = train_regressor(
+ regressor=self.freqai_info.get("regressor", "xgboost"),
X=X,
y=y,
- sample_weight=train_weights,
+ train_weights=train_weights,
eval_set=eval_set,
- sample_weight_eval_set=eval_weights,
- xgb_model=xgb_model,
+ eval_weights=eval_weights,
+ model_training_parameters=model_training_parameters,
+ init_model=init_model,
)
time_spent = time.time() - start
self.dd.update_metric_tracker("fit_time", time_spent, dk.pair)
return eval_set, eval_weights
- def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage | None:
- storage_dir = self.full_path
- storage_filename = f"optuna-{pair.split('/')[0]}"
- storage_backend = self.__optuna_config.get("storage", "file")
- if storage_backend == "sqlite":
- storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite"
- elif storage_backend == "file":
- storage = optuna.storages.JournalStorage(
- optuna.storages.journal.JournalFileBackend(
- f"{storage_dir}/{storage_filename}.log"
- )
- )
- return storage
-
def min_max_pred(
self,
pred_df: pd.DataFrame,
)
smoothing_methods: dict = {
"quantile": self.quantile_min_max_pred,
- "mean": XGBoostRegressorQuickAdapterV3.mean_min_max_pred,
- "median": XGBoostRegressorQuickAdapterV3.median_min_max_pred,
+ "mean": QuickAdapterRegressorV3.mean_min_max_pred,
+ "median": QuickAdapterRegressorV3.median_min_max_pred,
}
return smoothing_methods.get(
prediction_thresholds_smoothing, smoothing_methods["quantile"]
)(pred_df, fit_live_predictions_candles, label_period_candles)
- def optuna_hp_enqueue_previous_best_trial(
+ def optuna_hp_optimize(
self,
pair: str,
- study: optuna.study.Study,
- ) -> None:
- study_namespace = "hp"
- if self.__optuna_hp_params.get(pair):
- study.enqueue_trial(self.__optuna_hp_params[pair])
- elif self.optuna_load_best_params(pair, study_namespace):
- study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace))
+ X: pd.DataFrame,
+ y: pd.DataFrame,
+ train_weights: np.ndarray,
+ X_test: pd.DataFrame,
+ y_test: pd.DataFrame,
+ test_weights: np.ndarray,
+ ) -> tuple[dict, float] | tuple[None, None]:
+ namespace = "hp"
+ identifier = self.freqai_info["identifier"]
+ study = self.optuna_create_study(f"{identifier}-{namespace}-{pair}", pair)
+ if study is None:
+ return None, None
- def optuna_hp_optimize(
+ if self.__optuna_config.get("warm_start"):
+ self.optuna_enqueue_previous_best_params(pair, study, namespace)
+
+ def objective(trial: optuna.Trial) -> float:
+ return hp_objective(
+ trial,
+ self.freqai_info.get("regressor", "xgboost"),
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ self.model_training_parameters,
+ )
+
+ return self.optuna_process_study(
+ study=study, pair=pair, namespace=namespace, objective=objective
+ )
+
+ def optuna_period_optimize(
self,
pair: str,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
+ X: pd.DataFrame,
+ y: pd.DataFrame,
+ train_weights: np.ndarray,
+ X_test: pd.DataFrame,
+ y_test: pd.DataFrame,
+ test_weights: np.ndarray,
+ model_training_parameters: dict,
) -> tuple[dict, float] | tuple[None, None]:
- identifier = self.freqai_info.get("identifier", "no_id_provided")
- study_namespace = "hp"
- study_name = f"{identifier}-{study_namespace}-{pair}"
- storage = self.optuna_storage(pair)
- pruner = optuna.pruners.HyperbandPruner()
- if self.__optuna_config.get("continuous", True):
- XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
- study = optuna.create_study(
- study_name=study_name,
- sampler=optuna.samplers.TPESampler(
- multivariate=True,
- group=True,
- ),
- pruner=pruner,
- direction=optuna.study.StudyDirection.MINIMIZE,
- storage=storage,
- load_if_exists=not self.__optuna_config.get("continuous", True),
+ namespace = "period"
+ identifier = self.freqai_info["identifier"]
+ study = self.optuna_create_study(f"{identifier}-{namespace}-{pair}", pair)
+ if study is None:
+ return None, None
+
+ if self.__optuna_config.get("warm_start"):
+ self.optuna_enqueue_previous_best_params(pair, study, namespace)
+
+ def objective(trial: optuna.Trial) -> float:
+ return period_objective(
+ trial,
+ self.freqai_info.get("regressor", "xgboost"),
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ self.freqai_info.get("fit_live_predictions_candles", 100),
+ self.__optuna_config.get("candles_step"),
+ model_training_parameters,
+ )
+
+ return self.optuna_process_study(
+ study=study, pair=pair, namespace=namespace, objective=objective
)
- if self.__optuna_config.get("warm_start", True):
- self.optuna_hp_enqueue_previous_best_trial(pair, study)
- logger.info(f"Optuna {study_namespace} hyperopt started")
- start = time.time()
+
+ def optuna_storage(self, pair: str) -> Optional[optuna.storages.BaseStorage]:
+ storage_dir = self.full_path
+ storage_filename = f"optuna-{pair.split('/')[0]}"
+ storage_backend = self.__optuna_config.get("storage")
+ if storage_backend == "sqlite":
+ storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite"
+ elif storage_backend == "file":
+ storage = optuna.storages.JournalStorage(
+ optuna.storages.journal.JournalFileBackend(
+ f"{storage_dir}/{storage_filename}.log"
+ )
+ )
+ return storage
+
+ def optuna_create_study(
+ self, study_name: str, pair: str
+ ) -> Optional[optuna.study.Study]:
+ storage = self.optuna_storage(pair)
+ if storage is None:
+ logger.error(f"Failed to create optuna storage for {study_name}")
+ return None
+
+ if self.__optuna_config.get("continuous"):
+ self.optuna_study_delete(study_name, storage)
+
try:
- study.optimize(
- lambda trial: hp_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.model_training_parameters,
- ),
- n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
- n_jobs=min(
- self.__optuna_config.get("n_jobs", 1),
- max(int(self.max_system_threads / 4), 1),
- ),
- timeout=self.__optuna_config.get("timeout", 7200),
- gc_after_trial=True,
+ return optuna.create_study(
+ study_name=study_name,
+ sampler=optuna.samplers.TPESampler(multivariate=True, group=True),
+ pruner=optuna.pruners.HyperbandPruner(),
+ direction=optuna.study.StudyDirection.MINIMIZE,
+ storage=storage,
+ load_if_exists=not self.__optuna_config.get("continuous"),
)
except Exception as e:
- time_spent = time.time() - start
logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}",
- exc_info=True,
+ f"Failed to create optuna study {study_name}: {str(e)}", exc_info=True
)
- return None, None
- time_spent = time.time() - start
- if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
+ return None
+
+ def optuna_enqueue_previous_best_params(
+ self, pair: str, study: optuna.study.Study, namespace: str
+ ) -> None:
+ best_params = getattr(
+ self, f"_{self.__class__.__name__}__optuna_{namespace}_params"
+ ).get(pair)
+ if best_params:
+ study.enqueue_trial(best_params)
+ else:
+ best_params = self.optuna_load_best_params(pair, namespace)
+ if best_params:
+ study.enqueue_trial(best_params)
+
+ def optuna_handle_error(
+ self, namespace: str, start_time: float, e: Exception
+ ) -> None:
+ time_spent = time.time() - start_time
+ logger.error(
+ f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
+ exc_info=True,
+ )
+
+ def optuna_process_results(
+ self, study: optuna.study.Study, pair: str, namespace: str, start_time: float
+ ) -> tuple[dict, float] | tuple[None, None]:
+ time_spent = time.time() - start_time
+
+ if not self.optuna_study_has_best_params(study):
logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
+ f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
)
return None, None
- logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)")
params = study.best_params
- self.optuna_save_best_params(pair, study_namespace, params)
- # log params
- for key, value in {"rmse": study.best_value, **params}.items():
- logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}")
- return params, study.best_value
+ rmse = study.best_value
- def optuna_period_enqueue_previous_best_trial(
- self,
- pair: str,
- study: optuna.study.Study,
- ) -> None:
- study_namespace = "period"
- if self.__optuna_period_params.get(pair):
- study.enqueue_trial(self.__optuna_period_params[pair])
- elif self.optuna_load_best_params(pair, study_namespace):
- study.enqueue_trial(self.optuna_load_best_params(pair, study_namespace))
+ logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)")
+ for key, value in {"rmse": rmse, **params}.items():
+ logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}")
- def optuna_period_optimize(
+ if namespace == "hp":
+ self.__optuna_hp_params[pair] = params
+ self.__optuna_hp_rmse[pair] = rmse
+ elif namespace == "period":
+ self.__optuna_period_params[pair] = params
+ self.__optuna_period_rmse[pair] = rmse
+
+ self.optuna_save_best_params(pair, namespace, params)
+
+ return params, rmse
+
+ def optuna_process_study(
self,
+ study: optuna.study.Study,
pair: str,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- model_training_parameters,
+ namespace: str,
+ objective: Callable[[optuna.Trial], float],
) -> tuple[dict, float] | tuple[None, None]:
- identifier = self.freqai_info.get("identifier", "no_id_provided")
- study_namespace = "period"
- study_name = f"{identifier}-{study_namespace}-{pair}"
- storage = self.optuna_storage(pair)
- pruner = optuna.pruners.HyperbandPruner()
- if self.__optuna_config.get("continuous", True):
- XGBoostRegressorQuickAdapterV3.optuna_study_delete(study_name, storage)
- study = optuna.create_study(
- study_name=study_name,
- sampler=optuna.samplers.TPESampler(
- multivariate=True,
- group=True,
- ),
- pruner=pruner,
- direction=optuna.study.StudyDirection.MINIMIZE,
- storage=storage,
- load_if_exists=not self.__optuna_config.get("continuous", True),
- )
- if self.__optuna_config.get("warm_start", True):
- self.optuna_period_enqueue_previous_best_trial(pair, study)
- logger.info(f"Optuna {study_namespace} hyperopt started")
- start = time.time()
+ logger.info(f"Optuna {namespace} hyperopt started")
+ start_time = time.time()
+
try:
study.optimize(
- lambda trial: period_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.freqai_info.get("fit_live_predictions_candles", 100),
- self.__optuna_config.get("candles_step", 10),
- model_training_parameters,
- ),
- n_trials=self.__optuna_config.get("n_trials", N_TRIALS),
- n_jobs=min(
- self.__optuna_config.get("n_jobs", 1),
- max(int(self.max_system_threads / 4), 1),
- ),
- timeout=self.__optuna_config.get("timeout", 7200),
+ objective,
+ n_trials=self.__optuna_config.get("n_trials"),
+ n_jobs=self.__optuna_config.get("n_jobs"),
+ timeout=self.__optuna_config.get("timeout"),
gc_after_trial=True,
)
except Exception as e:
- time_spent = time.time() - start
- logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): {e}",
- exc_info=True,
- )
+ self.optuna_handle_error(namespace, start_time, e)
return None, None
- time_spent = time.time() - start
- if XGBoostRegressorQuickAdapterV3.optuna_study_has_best_params(study) is False:
- logger.error(
- f"Optuna {study_namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
- )
- return None, None
- logger.info(f"Optuna {study_namespace} hyperopt done ({time_spent:.2f} secs)")
- params = study.best_params
- self.optuna_save_best_params(pair, study_namespace, params)
- # log params
- for key, value in {"rmse": study.best_value, **params}.items():
- logger.info(f"Optuna {study_namespace} hyperopt | {key:>20s} : {value}")
- return params, study.best_value
+ return self.optuna_process_results(study, pair, namespace, start_time)
def optuna_save_best_params(
self, pair: str, namespace: str, best_params: dict
best_params_path = Path(
self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
)
- with best_params_path.open("w", encoding="utf-8") as write_file:
- json.dump(best_params, write_file, indent=4)
+ try:
+ with best_params_path.open("w", encoding="utf-8") as write_file:
+ json.dump(best_params, write_file, indent=4)
+ except Exception as e:
+ logger.error(
+ f"Failed to save optuna {namespace} best params for {pair}: {str(e)}",
+ exc_info=True,
+ )
+ raise
- def optuna_load_best_params(self, pair: str, namespace: str) -> dict | None:
+ def optuna_load_best_params(self, pair: str, namespace: str) -> Optional[dict]:
best_params_path = Path(
self.full_path / f"optuna-{namespace}-best-params-{pair.split('/')[0]}.json"
)
@staticmethod
def optuna_study_load(
study_name: str, storage: optuna.storages.BaseStorage
- ) -> optuna.study.Study | None:
+ ) -> Optional[optuna.study.Study]:
try:
study = optuna.load_study(study_name=study_name, storage=storage)
except Exception:
return study
@staticmethod
- def optuna_study_has_best_params(study: optuna.study.Study | None) -> bool:
+ def optuna_study_has_best_params(study: Optional[optuna.study.Study]) -> bool:
if study is None:
return False
try:
label_period_candles: int,
) -> tuple[pd.Series, pd.Series]:
pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
+ pred_df[[EXTREMA_COLUMN]]
.copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ .sort_values(by=EXTREMA_COLUMN, ascending=False)
+ .reset_index(drop=True)
)
label_period_frequency: int = int(
label_period_candles: int,
) -> tuple[pd.Series, pd.Series]:
pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
+ pred_df[[EXTREMA_COLUMN]]
.copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ .sort_values(by=EXTREMA_COLUMN, ascending=False)
+ .reset_index(drop=True)
)
label_period_frequency: int = int(
label_period_candles: int,
) -> tuple[pd.Series, pd.Series]:
pred_df_sorted = (
- pred_df.select_dtypes(exclude=["object"])
+ pred_df[[EXTREMA_COLUMN]]
.copy()
- .apply(lambda col: col.sort_values(ascending=False, ignore_index=True))
+ .sort_values(by=EXTREMA_COLUMN, ascending=False)
+ .reset_index(drop=True)
)
label_period_frequency: int = int(
return min_pred[EXTREMA_COLUMN], max_pred[EXTREMA_COLUMN]
-def period_objective(
- trial,
+def get_callbacks(trial: optuna.Trial, regressor: str) -> list:
+ if regressor == "xgboost":
+ callbacks = [
+ optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
+ ]
+ elif regressor == "lightgbm":
+ callbacks = [optuna.integration.LightGBMPruningCallback(trial, "rmse")]
+ else:
+ raise ValueError(f"Unsupported regressor model: {regressor}")
+ return callbacks
+
+
+def train_regressor(
+ regressor: str,
X,
y,
train_weights,
- X_test,
- y_test,
- test_weights,
+ eval_set,
+ eval_weights,
+ model_training_parameters: dict,
+ init_model: Any = None,
+ callbacks: list = None,
+) -> Any:
+ if regressor == "xgboost":
+ from xgboost import XGBRegressor
+
+ model = XGBRegressor(
+ objective="reg:squarederror",
+ eval_metric="rmse",
+ callbacks=callbacks,
+ **model_training_parameters,
+ )
+ model.fit(
+ X=X,
+ y=y,
+ sample_weight=train_weights,
+ eval_set=eval_set,
+ sample_weight_eval_set=eval_weights,
+ xgb_model=init_model,
+ )
+ elif regressor == "lightgbm":
+ from lightgbm import LGBMRegressor
+
+ model = LGBMRegressor(objective="regression", **model_training_parameters)
+ model.fit(
+ X=X,
+ y=y,
+ sample_weight=train_weights,
+ eval_set=eval_set,
+ eval_sample_weight=eval_weights,
+ eval_metric="rmse",
+ init_model=init_model,
+ callbacks=callbacks,
+ )
+ else:
+ raise ValueError(f"Unsupported regressor model: {regressor}")
+ return model
+
+
+def period_objective(
+ trial: optuna.Trial,
+ regressor: str,
+ X: pd.DataFrame,
+ y: pd.DataFrame,
+ train_weights: np.ndarray,
+ X_test: pd.DataFrame,
+ y_test: pd.DataFrame,
+ test_weights: np.ndarray,
fit_live_predictions_candles: int,
candles_step: int,
- model_training_parameters,
+ model_training_parameters: dict,
) -> float:
min_train_window: int = fit_live_predictions_candles * 2
max_train_window: int = max(len(X), min_train_window)
y_test = y_test.iloc[-test_window:]
test_weights = test_weights[-test_window:]
- # Fit the model
- model = XGBRegressor(
- objective="reg:squarederror",
- eval_metric="rmse",
- callbacks=[
- optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
- ],
- **model_training_parameters,
- )
- model.fit(
+ model = train_regressor(
+ regressor=regressor,
X=X,
y=y,
- sample_weight=train_weights,
+ train_weights=train_weights,
eval_set=[(X_test, y_test)],
- sample_weight_eval_set=[test_weights],
+ eval_weights=[test_weights],
+ model_training_parameters=model_training_parameters,
+ callbacks=get_callbacks(trial, regressor),
)
y_pred = model.predict(X_test)
return geometric_mean(errors)
-def hp_objective(
- trial,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- model_training_parameters,
-) -> float:
- study_parameters = {
+def get_optuna_study_model_parameters(trial: optuna.Trial, regressor: str) -> dict:
+ study_model_parameters = {
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
- "max_depth": trial.suggest_int("max_depth", 3, 18),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
"subsample": trial.suggest_float("subsample", 0.6, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
}
- model_training_parameters = {**model_training_parameters, **study_parameters}
+ if regressor == "xgboost":
+ study_model_parameters.update(
+ {
+ "max_depth": trial.suggest_int("max_depth", 3, 18),
+ }
+ )
+ elif regressor == "lightgbm":
+ study_model_parameters.update(
+ {
+ "num_leaves": trial.suggest_int("num_leaves", 2, 256),
+ "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
+ }
+ )
+ return study_model_parameters
- # Fit the model
- model = XGBRegressor(
- objective="reg:squarederror",
- eval_metric="rmse",
- callbacks=[
- optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")
- ],
- **model_training_parameters,
- )
- model.fit(
+
+def hp_objective(
+ trial: optuna.Trial,
+ regressor: str,
+ X: pd.DataFrame,
+ y: pd.DataFrame,
+ train_weights: np.ndarray,
+ X_test: pd.DataFrame,
+ y_test: pd.DataFrame,
+ test_weights: np.ndarray,
+ model_training_parameters: dict,
+) -> float:
+ study_model_parameters = get_optuna_study_model_parameters(trial, regressor)
+ model_training_parameters = {**model_training_parameters, **study_model_parameters}
+
+ model = train_regressor(
+ regressor=regressor,
X=X,
y=y,
- sample_weight=train_weights,
+ train_weights=train_weights,
eval_set=[(X_test, y_test)],
- sample_weight_eval_set=[test_weights],
+ eval_weights=[test_weights],
+ model_training_parameters=model_training_parameters,
+ callbacks=get_callbacks(trial, regressor),
)
y_pred = model.predict(X_test)
import json
import logging
-from functools import reduce
+from functools import reduce, cached_property
import datetime
import math
from pathlib import Path
INTERFACE_VERSION = 3
+ @cached_property
def version(self) -> str:
- return "3.2.6"
+ return "3.2.7"
timeframe = "5m"
stoploss = -0.02
use_custom_stoploss = True
- @property
+ @cached_property
def trailing_stoploss_natr_ratio(self) -> float:
return self.config.get("trailing_stoploss_natr_ratio", 0.025)
trailing_stop_positive_offset = 0.011
trailing_only_offset_is_reached = True
- @property
+ @cached_property
def entry_natr_ratio(self) -> float:
return self.config.get("entry_pricing", {}).get("entry_natr_ratio", 0.0025)
# reward_risk_ratio = 1.0 means 1:1 RR
# reward_risk_ratio = 2.0 means 1:2 RR
# ...
- @property
+ @cached_property
def reward_risk_ratio(self) -> float:
return self.config.get("exit_pricing", {}).get("reward_risk_ratio", 2.0)
process_only_new_candles = True
- @property
+ @cached_property
def can_short(self) -> bool:
return self.is_short_allowed()
- @property
+ @cached_property
def plot_config(self) -> dict:
return {
"main_plot": {},
},
}
- @property
+ @cached_property
def protections(self) -> list[dict]:
fit_live_predictions_candles = self.freqai_info.get(
"fit_live_predictions_candles", 100
use_exit_signal = True
- @property
+ @cached_property
def startup_candle_count(self) -> int:
# Match the predictions warmup period
return self.freqai_info.get("fit_live_predictions_candles", 100)
raise ValueError(
"FreqAI strategy requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration"
)
+ if (
+ self.freqai_info.get("identifier") is None
+ or self.freqai_info.get("identifier").strip() == ""
+ ):
+ raise ValueError(
+ "FreqAI strategy requires identifier defined in the freqai section configuration"
+ )
self.models_full_path = Path(
self.config["user_data_dir"]
/ "models"
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
return df
- def get_trade_entry_candle(self, df: DataFrame, trade: Trade) -> DataFrame | None:
+ def get_trade_entry_candle(
+ self, df: DataFrame, trade: Trade
+ ) -> Optional[DataFrame]:
entry_date = timeframe_to_prev_date(self.timeframe, trade.open_date_utc)
entry_candle = df.loc[(df["date"] == entry_date)]
if entry_candle.empty:
return None
return entry_candle
- def get_trade_entry_natr(self, df: DataFrame, trade: Trade) -> float | None:
+ def get_trade_entry_natr(self, df: DataFrame, trade: Trade) -> Optional[float]:
entry_candle = self.get_trade_entry_candle(df, trade)
if entry_candle is None:
return None
entry_candle = entry_candle.squeeze()
return entry_candle["natr_labeling_window"]
- def get_trade_duration_candles(self, df: DataFrame, trade: Trade) -> int | None:
+ def get_trade_duration_candles(self, df: DataFrame, trade: Trade) -> Optional[int]:
"""
Get the number of candles since the trade entry.
:param df: DataFrame with the current data
def get_stoploss_distance(
self, df: DataFrame, trade: Trade, current_rate: float
- ) -> float | None:
+ ) -> Optional[float]:
trade_duration_candles = self.get_trade_duration_candles(df, trade)
if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
return None
* (1 / math.log10(1 + 0.25 * trade_duration_candles))
)
- def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> float | None:
+ def get_take_profit_distance(self, df: DataFrame, trade: Trade) -> Optional[float]:
trade_duration_candles = self.get_trade_duration_candles(df, trade)
if QuickAdapterV3.is_trade_duration_valid(trade_duration_candles) is False:
return None
current_rate: float,
current_profit: float,
**kwargs,
- ) -> float | None:
+ ) -> Optional[float]:
df, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
if df.empty:
current_rate: float,
current_profit: float,
**kwargs,
- ) -> str | None:
+ ) -> Optional[str]:
df, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
if df.empty:
smoothing_methods["gaussian"],
)
- def load_period_best_params(self, pair: str) -> dict | None:
+ def load_period_best_params(self, pair: str) -> Optional[dict]:
namespace = "period"
best_params_path = Path(
self.models_full_path
import numpy as np
import pandas as pd
import talib.abstract as ta
+from typing import Callable
from scipy.signal import convolve
from scipy.signal.windows import gaussian
from technical import qtpylib
return 2 * series - series.shift(lag)
-def get_ma_fn(mamode: str) -> callable:
+def get_ma_fn(mamode: str) -> Callable[[pd.Series, int], pd.Series]:
mamodes: dict = {
"sma": ta.SMA,
"ema": ta.EMA,
return smma
-def get_price_fn(pricemode: str) -> callable:
+def get_price_fn(pricemode: str) -> Callable[[pd.DataFrame], pd.Series]:
pricemodes = {
"average": ta.AVGPRICE,
"median": ta.MEDPRICE,