@cached_property
def __optuna_config(self) -> dict:
+ optuna_default_config = {
+ "enabled": False,
+ "n_jobs": min(
+ self.freqai_info.get("optuna_hyperopt", {}).get("n_jobs", 1),
+ max(int(self.max_system_threads / 4), 1),
+ ),
+ "storage": "file",
+ "continuous": True,
+ "warm_start": True,
+ "n_trials": 36,
+ "timeout": 7200,
+ "candles_step": 10,
+ }
return {
- **{
- "enabled": False,
- "n_jobs": min(
- self.freqai_info.get("optuna_hyperopt", {}).get("n_jobs", 1),
- max(int(self.max_system_threads / 4), 1),
- ),
- "storage": "file",
- "continuous": True,
- "warm_start": True,
- "n_trials": 36,
- "timeout": 7200,
- "candles_step": 10,
- },
+ **optuna_default_config,
**self.freqai_info.get("optuna_hyperopt", {}),
}
self.pairs = self.config.get("exchange", {}).get("pair_whitelist")
if not self.pairs:
raise ValueError(
- "FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration"
+ "FreqAI model requires StaticPairList method defined in pairlists configuration and 'pair_whitelist' defined in exchange section configuration"
)
if (
self.freqai_info.get("identifier") is None
or self.freqai_info.get("identifier").strip() == ""
):
raise ValueError(
- "FreqAI model requires identifier defined in the freqai section configuration"
+ "FreqAI model requires 'identifier' defined in the freqai section configuration"
)
self.__optuna_hyperopt: bool = (
self.freqai_info.get("enabled", False)
start = time.time()
if self.__optuna_hyperopt:
- self.optuna_hp_optimize(
- dk.pair, X, y, train_weights, X_test, y_test, test_weights
+ self.optuna_optimize(
+ dk.pair,
+ "hp",
+ lambda trial: hp_objective(
+ trial,
+ self.freqai_info.get("regressor", "xgboost"),
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ model_training_parameters,
+ ),
+ self.__optuna_hp_params,
+ self.__optuna_hp_rmse,
)
if self.__optuna_hp_params.get(dk.pair):
**self.__optuna_hp_params[dk.pair],
}
- self.optuna_period_optimize(
+ self.optuna_optimize(
dk.pair,
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- model_training_parameters,
+ "period",
+ lambda trial: period_objective(
+ trial,
+ self.freqai_info.get("regressor", "xgboost"),
+ X,
+ y,
+ train_weights,
+ X_test,
+ y_test,
+ test_weights,
+ self.freqai_info.get("fit_live_predictions_candles", 100),
+ self.__optuna_config.get("candles_step"),
+ model_training_parameters,
+ ),
+ self.__optuna_period_params,
+ self.__optuna_period_rmse,
)
if self.__optuna_period_params.get(dk.pair):
return model
def get_label_period_candles(self, pair: str) -> int:
- if self.__optuna_period_params.get(pair, {}).get("label_period_candles"):
- return self.__optuna_period_params[pair]["label_period_candles"]
- return self.ft_params["label_period_candles"]
+ label_period_candles = self.__optuna_period_params.get(pair, {}).get(
+ "label_period_candles"
+ )
+ if label_period_candles:
+ return label_period_candles
+ return self.ft_params.get("label_period_candles", 50)
def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
warmed_up = True
self.__optuna_period_rmse.get(pair, -1)
)
- def eval_set_and_weights(self, X_test, y_test, test_weights):
+ def eval_set_and_weights(
+ self, X_test: pd.DataFrame, y_test: pd.DataFrame, test_weights: np.ndarray
+ ) -> tuple[list[tuple] | None, list[np.ndarray] | None]:
if self.data_split_parameters.get("test_size", TEST_SIZE) == 0:
eval_set = None
eval_weights = None
prediction_thresholds_smoothing = self.freqai_info.get(
"prediction_thresholds_smoothing", "quantile"
)
- smoothing_methods: dict = {
+ smoothing_methods: dict[str, Callable] = {
"quantile": self.quantile_min_max_pred,
"mean": QuickAdapterRegressorV3.mean_min_max_pred,
"median": QuickAdapterRegressorV3.median_min_max_pred,
prediction_thresholds_smoothing, smoothing_methods["quantile"]
)(pred_df, fit_live_predictions_candles, label_period_candles)
- def optuna_hp_optimize(
+ def optuna_optimize(
self,
pair: str,
- X: pd.DataFrame,
- y: pd.DataFrame,
- train_weights: np.ndarray,
- X_test: pd.DataFrame,
- y_test: pd.DataFrame,
- test_weights: np.ndarray,
- ) -> tuple[dict, float] | tuple[None, None]:
- namespace = "hp"
+ namespace: str,
+ objective: Callable[[optuna.Trial], float],
+ params_storage: dict[str, dict],
+ rmse_storage: dict[str, float],
+ ) -> None:
identifier = self.freqai_info["identifier"]
study = self.optuna_create_study(f"{identifier}-{namespace}-{pair}", pair)
- if study is None:
- return None, None
+ if not study:
+ return
if self.__optuna_config.get("warm_start"):
self.optuna_enqueue_previous_best_params(pair, study, namespace)
- def objective(trial: optuna.Trial) -> float:
- return hp_objective(
- trial,
- self.freqai_info.get("regressor", "xgboost"),
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.model_training_parameters,
+ logger.info(f"Optuna {namespace} hyperopt started")
+ start_time = time.time()
+ try:
+ study.optimize(
+ objective,
+ n_trials=self.__optuna_config["n_trials"],
+ n_jobs=self.__optuna_config["n_jobs"],
+ timeout=self.__optuna_config["timeout"],
)
-
- return self.optuna_process_study(
- study=study, pair=pair, namespace=namespace, objective=objective
- )
-
- def optuna_period_optimize(
- self,
- pair: str,
- X: pd.DataFrame,
- y: pd.DataFrame,
- train_weights: np.ndarray,
- X_test: pd.DataFrame,
- y_test: pd.DataFrame,
- test_weights: np.ndarray,
- model_training_parameters: dict,
- ) -> tuple[dict, float] | tuple[None, None]:
- namespace = "period"
- identifier = self.freqai_info["identifier"]
- study = self.optuna_create_study(f"{identifier}-{namespace}-{pair}", pair)
- if study is None:
- return None, None
-
- if self.__optuna_config.get("warm_start"):
- self.optuna_enqueue_previous_best_params(pair, study, namespace)
-
- def objective(trial: optuna.Trial) -> float:
- return period_objective(
- trial,
- self.freqai_info.get("regressor", "xgboost"),
- X,
- y,
- train_weights,
- X_test,
- y_test,
- test_weights,
- self.freqai_info.get("fit_live_predictions_candles", 100),
- self.__optuna_config.get("candles_step"),
- model_training_parameters,
+ except Exception as e:
+ time_spent = time.time() - start_time
+ logger.error(
+ f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
+ exc_info=True,
)
+ return
- return self.optuna_process_study(
- study=study, pair=pair, namespace=namespace, objective=objective
- )
+ time_spent = time.time() - start_time
+ if self.optuna_study_has_best_params(study):
+ params_storage[pair] = study.best_params
+ rmse_storage[pair] = study.best_value
+ logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)")
+ for key, value in {
+ "rmse": rmse_storage[pair],
+ **params_storage[pair],
+ }.items():
+ logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}")
+ self.optuna_save_best_params(pair, namespace, params_storage[pair])
+ else:
+ logger.error(
+ f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
+ )
def optuna_storage(self, pair: str) -> Optional[optuna.storages.BaseStorage]:
storage_dir = self.full_path
try:
return optuna.create_study(
study_name=study_name,
- sampler=optuna.samplers.TPESampler(multivariate=True, group=True),
+ sampler=optuna.samplers.TPESampler(
+ multivariate=True, group=True, seed=1
+ ),
pruner=optuna.pruners.HyperbandPruner(),
direction=optuna.study.StudyDirection.MINIMIZE,
storage=storage,
def optuna_enqueue_previous_best_params(
self, pair: str, study: optuna.study.Study, namespace: str
) -> None:
- best_params = getattr(
- self, f"_{self.__class__.__name__}__optuna_{namespace}_params"
- ).get(pair)
+ if namespace == "hp":
+ best_params = self.__optuna_hp_params.get(pair, {})
+ elif namespace == "period":
+ best_params = self.__optuna_period_params.get(pair, {})
if best_params:
study.enqueue_trial(best_params)
else:
if best_params:
study.enqueue_trial(best_params)
- def optuna_handle_error(
- self, namespace: str, start_time: float, e: Exception
- ) -> None:
- time_spent = time.time() - start_time
- logger.error(
- f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): {str(e)}",
- exc_info=True,
- )
-
- def optuna_process_results(
- self, study: optuna.study.Study, pair: str, namespace: str, start_time: float
- ) -> tuple[dict, float] | tuple[None, None]:
- time_spent = time.time() - start_time
-
- if not self.optuna_study_has_best_params(study):
- logger.error(
- f"Optuna {namespace} hyperopt failed ({time_spent:.2f} secs): no study best params found"
- )
- return None, None
-
- params = study.best_params
- rmse = study.best_value
-
- logger.info(f"Optuna {namespace} hyperopt done ({time_spent:.2f} secs)")
- for key, value in {"rmse": rmse, **params}.items():
- logger.info(f"Optuna {namespace} hyperopt | {key:>20s} : {value}")
-
- if namespace == "hp":
- self.__optuna_hp_params[pair] = params
- self.__optuna_hp_rmse[pair] = rmse
- elif namespace == "period":
- self.__optuna_period_params[pair] = params
- self.__optuna_period_rmse[pair] = rmse
-
- self.optuna_save_best_params(pair, namespace, params)
-
- return params, rmse
-
- def optuna_process_study(
- self,
- study: optuna.study.Study,
- pair: str,
- namespace: str,
- objective: Callable[[optuna.Trial], float],
- ) -> tuple[dict, float] | tuple[None, None]:
- logger.info(f"Optuna {namespace} hyperopt started")
- start_time = time.time()
-
- try:
- study.optimize(
- objective,
- n_trials=self.__optuna_config.get("n_trials"),
- n_jobs=self.__optuna_config.get("n_jobs"),
- timeout=self.__optuna_config.get("timeout"),
- gc_after_trial=True,
- )
- except Exception as e:
- self.optuna_handle_error(namespace, start_time, e)
- return None, None
-
- return self.optuna_process_results(study, pair, namespace, start_time)
-
def optuna_save_best_params(
self, pair: str, namespace: str, best_params: dict
) -> None:
return False
@staticmethod
- def mean_min_max_pred(
+ def get_pred_df_sorted_and_label_period_frequency(
pred_df: pd.DataFrame,
fit_live_predictions_candles: int,
label_period_candles: int,
- ) -> tuple[pd.Series, pd.Series]:
+ ) -> tuple[pd.DataFrame, int]:
pred_df_sorted = (
pred_df[[EXTREMA_COLUMN]]
.copy()
.sort_values(by=EXTREMA_COLUMN, ascending=False)
.reset_index(drop=True)
)
+ label_period_frequency: int = max(
+ 1, int(fit_live_predictions_candles / (label_period_candles * 2))
+ )
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
+ return pred_df_sorted, label_period_frequency
+
+ @staticmethod
+ def mean_min_max_pred(
+ pred_df: pd.DataFrame,
+ fit_live_predictions_candles: int,
+ label_period_candles: int,
+ ) -> tuple[pd.Series, pd.Series]:
+ pred_df_sorted, label_period_frequency = (
+ QuickAdapterRegressorV3.get_pred_df_sorted_and_label_period_frequency(
+ pred_df, fit_live_predictions_candles, label_period_candles
+ )
)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].mean()
max_pred = pred_df_sorted.iloc[:label_period_frequency].mean()
fit_live_predictions_candles: int,
label_period_candles: int,
) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df[[EXTREMA_COLUMN]]
- .copy()
- .sort_values(by=EXTREMA_COLUMN, ascending=False)
- .reset_index(drop=True)
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
+ pred_df_sorted, label_period_frequency = (
+ QuickAdapterRegressorV3.get_pred_df_sorted_and_label_period_frequency(
+ pred_df, fit_live_predictions_candles, label_period_candles
+ )
)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].median()
max_pred = pred_df_sorted.iloc[:label_period_frequency].median()
fit_live_predictions_candles: int,
label_period_candles: int,
) -> tuple[pd.Series, pd.Series]:
- pred_df_sorted = (
- pred_df[[EXTREMA_COLUMN]]
- .copy()
- .sort_values(by=EXTREMA_COLUMN, ascending=False)
- .reset_index(drop=True)
- )
-
- label_period_frequency: int = int(
- fit_live_predictions_candles / (label_period_candles * 2)
+ pred_df_sorted, label_period_frequency = (
+ QuickAdapterRegressorV3.get_pred_df_sorted_and_label_period_frequency(
+ pred_df, fit_live_predictions_candles, label_period_candles
+ )
)
q = self.freqai_info.get("quantile", 0.75)
min_pred = pred_df_sorted.iloc[-label_period_frequency:].quantile(1 - q)
def train_regressor(
regressor: str,
- X,
- y,
- train_weights,
- eval_set,
- eval_weights,
+ X: pd.DataFrame,
+ y: pd.DataFrame,
+ train_weights: np.ndarray,
+ eval_set: Optional[list[tuple]],
+ eval_weights: Optional[list[np.ndarray]],
model_training_parameters: dict,
init_model: Any = None,
callbacks: list = None,
study_model_parameters = {
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 200),
- "subsample": trial.suggest_float("subsample", 0.6, 1.0),
- "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
+ "subsample": trial.suggest_float("subsample", 0.5, 1.0),
+ "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 10.0, log=True),
}
if regressor == "xgboost":
study_model_parameters.update(
{
- "max_depth": trial.suggest_int("max_depth", 3, 18),
+ "max_depth": trial.suggest_int("max_depth", 3, 15),
+ "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
}
)
elif regressor == "lightgbm":
study_model_parameters.update(
{
"num_leaves": trial.suggest_int("num_leaves", 2, 256),
- "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
+ "min_split_gain": trial.suggest_float(
+ "min_split_gain", 1e-8, 1.0, log=True
+ ),
+ "min_child_samples": trial.suggest_int("min_child_samples", 10, 100),
}
)
return study_model_parameters