},
"rl_config_optuna": {
"enabled": false, // Enable optuna hyperopt
+ "per_pair: false, // Enable per pair hyperopt
"n_trials": 100,
"n_startup_trials": 10,
"timeout_hours": 0,
output = output.rolling(window=self.CONV_WIDTH).apply(_predict)
return output
- def get_storage(self, pair: str) -> BaseStorage:
+ def get_storage(self, pair: str | None = None) -> BaseStorage:
"""
Get the storage for Optuna
"""
storage_dir = str(self.full_path)
+ storage_filename = f"optuna-{pair.split('/')[0]}" if pair else "optuna"
storage_backend = self.rl_config_optuna.get("storage", "sqlite")
if storage_backend == "sqlite":
- storage = f"sqlite:///{storage_dir}/optuna-{pair.split('/')[0]}.sqlite"
+ storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite"
elif storage_backend == "file":
storage = JournalStorage(
- JournalFileBackend(f"{storage_dir}/optuna-{pair.split('/')[0]}.log")
+ JournalFileBackend(f"{storage_dir}/{storage_filename}.log")
)
return storage
Runs hyperparameter optimization using Optuna and
returns the best hyperparameters found
"""
- study_name = str(dk.pair)
- storage = self.get_storage(dk.pair)
+ _, identifier = str(self.full_path).rsplit("/", 1)
+ if self.rl_config_optuna.get("per_pair", False):
+ study_name = f"{identifier}-{dk.pair}"
+ storage = self.get_storage(dk.pair)
+ else:
+ study_name = identifier
+ storage = self.get_storage()
study: Study = create_study(
study_name=study_name,
sampler=TPESampler(
logger.info(
"------------ Hyperopt results %s (%.2f secs) ------------",
- dk.pair,
+ study_name,
time_spent,
)
logger.info(
logger.info("Best trial params: %s", study.best_trial.params)
logger.info("-------------------------------------------------------")
- self.save_best_params(dk.pair, study.best_trial.params)
+ self.save_best_params(
+ study.best_trial.params,
+ dk.pair if self.rl_config_optuna.get("per_pair", False) else None,
+ )
return study.best_trial.params
- def save_best_params(self, pair: str, best_params: Dict) -> None:
+ def save_best_params(self, best_params: Dict, pair: str | None = None) -> None:
"""
Save the best hyperparameters found during hyperparameter optimization
"""
- best_params_path = Path(
- self.full_path / f"hyperopt-best-params-{pair.split('/')[0]}.json"
+ best_params_filename = (
+ f"hyperopt-best-params-{pair.split('/')[0]}"
+ if pair
+ else "hyperopt-best-params"
)
- logger.info(f"{pair}: saving best params to %s JSON file", best_params_path)
+ best_params_path = Path(self.full_path / f"{best_params_filename}.json")
+ log_msg: str = (
+ f"{pair}: saving best params to {best_params_path} JSON file"
+ if pair
+ else f"saving best params to {best_params_path} JSON file"
+ )
+ logger.info(log_msg)
with best_params_path.open("w", encoding="utf-8") as write_file:
json.dump(best_params, write_file, indent=4)
- def load_best_params(self, pair: str) -> Dict | None:
+ def load_best_params(self, pair: str | None = None) -> Dict | None:
"""
Load the best hyperparameters found and saved during hyperparameter optimization
"""
- best_params_path = Path(
- self.full_path / f"hyperopt-best-params-{pair.split('/')[0]}.json"
+ best_params_filename = (
+ f"hyperopt-best-params-{pair.split('/')[0]}"
+ if pair
+ else "hyperopt-best-params.json"
+ )
+ best_params_path = Path(self.full_path / f"{best_params_filename}.json")
+ log_msg: str = (
+ f"{pair}: loading best params from {best_params_path} JSON file"
+ if pair
+ else f"loading best params from {best_params_path} JSON file"
)
if best_params_path.is_file():
- logger.info(
- f"{pair}: loading best params from %s JSON file", best_params_path
- )
+ logger.info(log_msg)
with best_params_path.open("r", encoding="utf-8") as read_file:
best_params = json.load(read_file)
return best_params
else:
tensorboard_log_path = None
- logger.info(
- "------------ Hyperopt trial %d %s ------------", trial.number, dk.pair
- )
+ logger.info("------------ Hyperopt trial %d ------------", trial.number)
logger.info("Trial %s params: %s", trial.number, params)
model = self.MODELCLASS(
def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage:
storage_dir = str(self.full_path)
+ storage_filename = f"optuna-{pair.split('/')[0]}"
storage_backend = self.__optuna_config.get("storage", "file")
if storage_backend == "sqlite":
- storage = f"sqlite:///{storage_dir}/optuna-{pair.split('/')[0]}.sqlite"
+ storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite"
elif storage_backend == "file":
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(
- f"{storage_dir}/optuna-{pair.split('/')[0]}.log"
+ f"{storage_dir}/{storage_filename}.log"
)
)
return storage
y_test,
test_weights,
) -> tuple[Dict | None, float | None]:
+ _, identifier = str(self.full_path).rsplit("/", 1)
study_namespace = "hp"
- study_name = f"{study_namespace}-{pair}"
+ study_name = f"{identifier}-{study_namespace}-{pair}"
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
self.optuna_study_delete(study_name, storage)
test_weights,
model_training_parameters,
) -> tuple[Dict | None, float | None]:
+ _, identifier = str(self.full_path).rsplit("/", 1)
study_namespace = "period"
- study_name = f"{study_namespace}-{pair}"
+ study_name = f"{identifier}-{study_namespace}-{pair}"
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
self.optuna_study_delete(study_name, storage)
def optuna_storage(self, pair: str) -> optuna.storages.BaseStorage:
storage_dir = str(self.full_path)
+ storage_filename = f"optuna-{pair.split('/')[0]}"
storage_backend = self.__optuna_config.get("storage", "file")
if storage_backend == "sqlite":
- storage = f"sqlite:///{storage_dir}/optuna-{pair.split('/')[0]}.sqlite"
+ storage = f"sqlite:///{storage_dir}/{storage_filename}.sqlite"
elif storage_backend == "file":
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(
- f"{storage_dir}/optuna-{pair.split('/')[0]}.log"
+ f"{storage_dir}/{storage_filename}.log"
)
)
return storage
y_test,
test_weights,
) -> tuple[Dict | None, float | None]:
+ _, identifier = str(self.full_path).rsplit("/", 1)
study_namespace = "hp"
- study_name = f"{study_namespace}-{pair}"
+ study_name = f"{identifier}-{study_namespace}-{pair}"
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
self.optuna_study_delete(study_name, storage)
test_weights,
model_training_parameters,
) -> tuple[Dict | None, float | None]:
+ _, identifier = str(self.full_path).rsplit("/", 1)
study_namespace = "period"
- study_name = f"{study_namespace}-{pair}"
+ study_name = f"{identifier}-{study_namespace}-{pair}"
storage = self.optuna_storage(pair)
pruner = optuna.pruners.HyperbandPruner()
self.optuna_study_delete(study_name, storage)