from functools import lru_cache
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple, Type
+from collections.abc import Mapping
import matplotlib
import matplotlib.pyplot as plt
train_df = data_dictionary.get("train_features")
test_df = data_dictionary.get("test_features")
env_dict = self.pack_env_dict(dk.pair)
- seed = self.model_training_parameters.get("seed", 42)
+ seed = self.get_model_params().get("seed", 42)
if self.check_envs:
logger.info("Checking environments...")
model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters)
if self.lr_schedule:
- _lr = model_params.get("learning_rate", 0.0003)
- model_params["learning_rate"] = linear_schedule(_lr)
- logger.info("Learning rate linear schedule enabled, initial value: %s", _lr)
+ lr = model_params.get("learning_rate", 0.0003)
+ model_params["learning_rate"] = linear_schedule(lr)
+ logger.info("Learning rate linear schedule enabled, initial value: %s", lr)
if "PPO" in self.model_type and self.cr_schedule:
- _cr = model_params.get("clip_range", 0.2)
- model_params["clip_range"] = linear_schedule(_cr)
- logger.info("Clip range linear schedule enabled, initial value: %s", _cr)
+ cr = model_params.get("clip_range", 0.2)
+ model_params["clip_range"] = linear_schedule(cr)
+ logger.info("Clip range linear schedule enabled, initial value: %s", cr)
if not model_params.get("policy_kwargs"):
model_params["policy_kwargs"] = {}
net_arch = model_params.get("policy_kwargs", {}).get("net_arch", [128, 128])
if "PPO" in self.model_type:
- model_params["policy_kwargs"]["net_arch"] = {
- "pi": net_arch,
- "vf": net_arch,
- }
+ if not isinstance(net_arch, dict):
+ model_params["policy_kwargs"]["net_arch"] = {
+ "pi": net_arch,
+ "vf": net_arch,
+ }
else:
model_params["policy_kwargs"]["net_arch"] = net_arch
callbacks: list[BaseCallback] = []
no_improvement_callback = None
rollout_plot_callback = None
- verbose = self.model_training_parameters.get("verbose", 0)
+ verbose = self.get_model_params().get("verbose", 0)
if self.plot_new_best:
rollout_plot_callback = RolloutPlotCallback(verbose=verbose)
returns the best hyperparameters found merged with the user defined parameters
"""
identifier = self.freqai_info.get("identifier", "no_id_provided")
- if self.rl_config_optuna.get("per_pair", False):
- study_name = f"{identifier}-{dk.pair}"
- storage = self.get_storage(dk.pair)
- else:
- study_name = identifier
- storage = self.get_storage()
+ study_name = (
+ f"{identifier}-{dk.pair}"
+ if self.rl_config_optuna.get("per_pair", False)
+ else identifier
+ )
+ storage = (
+ self.get_storage(dk.pair)
+ if self.rl_config_optuna.get("per_pair", False)
+ else self.get_storage()
+ )
eval_freq = max(1, len(train_df) // self.n_envs)
study: Study = create_study(
study_name=study_name,
)
hyperopt_failed = True
time_spent = time.time() - start_time
- if ReforceXY.study_has_best_trial(study) is False:
+ if not ReforceXY.study_has_best_trial(study):
logger.error(
f"Hyperopt {study_name} failed ({time_spent:.2f} secs): no study best trial found"
)
study_name,
time_spent,
)
- logger.info(
- "Best trial: %s. Score: %s", study.best_trial.number, study.best_trial.value
- )
+ if ReforceXY.study_has_best_trial(study):
+ logger.info(
+ "Best trial: %s. Score: %s",
+ study.best_trial.number,
+ study.best_trial.value,
+ )
logger.info("Best trial params: %s", best_trial_params)
logger.info("-------------------------------------------------------")
dk.pair if self.rl_config_optuna.get("per_pair", False) else None,
)
- return {**self.model_training_parameters, **best_trial_params}
+ return deepmerge(
+ self.get_model_params(),
+ convert_optuna_params_to_model_params(self.model_type, best_trial_params),
+ )
def save_best_trial_params(
self, best_trial_params: Dict[str, Any], pair: Optional[str] = None
else:
raise NotImplementedError
- # Ensure that the sampled parameters take precedence
- params = {**self.model_training_parameters, **params}
+ params = deepmerge(self.get_model_params(), params)
nan_encountered = False
"gradient_steps": self.model.gradient_steps,
"learning_starts": self.model.learning_starts,
"target_update_interval": self.model.target_update_interval,
- "exploration_fraction": self.model.exploration_fraction,
"exploration_initial_eps": self.model.exploration_initial_eps,
"exploration_final_eps": self.model.exploration_final_eps,
+ "exploration_fraction": self.model.exploration_fraction,
"exploration_rate": self.model.exploration_rate,
}
)
return _init
+def deepmerge(dst: dict[str, Any], src: dict[str, Any]) -> dict[str, Any]:
+ """Recursively merge two dicts without mutating inputs"""
+ dst_copy = copy.deepcopy(dst)
+ for k, v in src.items():
+ if (
+ k in dst_copy
+ and isinstance(dst_copy[k], Mapping)
+ and isinstance(v, Mapping)
+ ):
+ dst_copy[k] = deepmerge(dst_copy[k], v)
+ else:
+ dst_copy[k] = v
+ return dst_copy
+
+
def linear_schedule(initial_value: float) -> Callable[[float], float]:
def func(progress_remaining: float) -> float:
return progress_remaining * initial_value
}.get(optimizer_class_name, th.optim.Adam)
+def convert_optuna_params_to_model_params(
+ model_type: str, optuna_params: Dict[str, Any]
+) -> Dict[str, Any]:
+ model_params = {}
+ policy_kwargs = {}
+
+ lr = optuna_params.get("learning_rate")
+ if optuna_params.get("lr_schedule", "") == "linear":
+ lr = linear_schedule(lr)
+
+ if "PPO" in model_type:
+ cr = optuna_params.get("clip_range")
+ if optuna_params.get("cr_schedule") == "linear":
+ cr = linear_schedule(cr)
+ model_params.update(
+ {
+ "n_steps": optuna_params.get("n_steps"),
+ "batch_size": optuna_params.get("batch_size"),
+ "gamma": optuna_params.get("gamma"),
+ "learning_rate": lr,
+ "ent_coef": optuna_params.get("ent_coef"),
+ "clip_range": cr,
+ "n_epochs": optuna_params.get("n_epochs"),
+ "gae_lambda": optuna_params.get("gae_lambda"),
+ "max_grad_norm": optuna_params.get("max_grad_norm"),
+ "vf_coef": optuna_params.get("vf_coef"),
+ }
+ )
+ if optuna_params.get("target_kl") is not None:
+ model_params["target_kl"] = optuna_params.get("target_kl")
+ elif "DQN" in model_type:
+ model_params.update(
+ {
+ "gamma": optuna_params.get("gamma"),
+ "batch_size": optuna_params.get("batch_size"),
+ "learning_rate": lr,
+ "buffer_size": optuna_params.get("buffer_size"),
+ "train_freq": optuna_params.get("train_freq"),
+ "gradient_steps": optuna_params.get("gradient_steps"),
+ "exploration_fraction": optuna_params.get("exploration_fraction"),
+ "exploration_initial_eps": optuna_params.get("exploration_initial_eps"),
+ "exploration_final_eps": optuna_params.get("exploration_final_eps"),
+ "target_update_interval": optuna_params.get("target_update_interval"),
+ "learning_starts": optuna_params.get("learning_starts"),
+ }
+ )
+ if "QRDQN" in model_type and optuna_params.get("n_quantiles"):
+ policy_kwargs["n_quantiles"] = optuna_params["n_quantiles"]
+ else:
+ raise ValueError(f"Model {model_type} not supported")
+
+ if optuna_params.get("net_arch"):
+ policy_kwargs["net_arch"] = get_net_arch(model_type, optuna_params["net_arch"])
+ if optuna_params.get("activation_fn"):
+ policy_kwargs["activation_fn"] = get_activation_fn(
+ optuna_params["activation_fn"]
+ )
+ if optuna_params.get("optimizer_class"):
+ policy_kwargs["optimizer_class"] = get_optimizer_class(
+ optuna_params["optimizer_class"]
+ )
+ if optuna_params.get("ortho_init") is not None:
+ policy_kwargs["ortho_init"] = optuna_params["ortho_init"]
+ model_params["policy_kwargs"] = policy_kwargs
+
+ return model_params
+
+
def sample_params_ppo(trial: Trial) -> Dict[str, Any]:
"""
Sampler for PPO hyperparams
"""
- batch_size = trial.suggest_categorical("batch_size", [64, 256, 512, 1024, 10240])
- n_steps = trial.suggest_categorical("n_steps", [512, 1024, 10240])
- gamma = trial.suggest_float("gamma", 0.1, 0.99, step=0.01)
- learning_rate = trial.suggest_float("learning_rate", 1e-6, 0.01, log=True)
- ent_coef = trial.suggest_float("ent_coef", 0.001, 0.1, log=True)
- clip_range = trial.suggest_categorical("clip_range", [0.1, 0.2, 0.3, 0.4, 0.5])
- n_epochs = trial.suggest_categorical("n_epochs", [1, 5, 10, 20])
- gae_lambda = trial.suggest_float("gae_lambda", 0.1, 0.99, step=0.01)
- max_grad_norm = trial.suggest_float("max_grad_norm", 0.1, 5, step=0.01)
- vf_coef = trial.suggest_float("vf_coef", 0, 1, step=0.01)
- ortho_init = trial.suggest_categorical("ortho_init", [False, True])
- lr_schedule = trial.suggest_categorical("lr_schedule", ["linear", "constant"])
- if lr_schedule == "linear":
- learning_rate = linear_schedule(learning_rate)
- cr_schedule = trial.suggest_categorical("cr_schedule", ["linear", "constant"])
- if cr_schedule == "linear":
- clip_range = linear_schedule(clip_range)
- if batch_size > n_steps:
- batch_size = n_steps
- net_arch_type: str = trial.suggest_categorical(
- "net_arch", ["small", "medium", "large", "extra_large"]
- )
- net_arch = get_net_arch("PPO", net_arch_type)
- activation_fn_name: str = trial.suggest_categorical(
- "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
+ n_steps = trial.suggest_categorical("n_steps", [512, 1024, 2048, 4096])
+ batch_size_candidates = [64, 128, 256, 512, 1024]
+ batch_size_suggestions = [
+ b for b in batch_size_candidates if b <= n_steps and n_steps % b == 0
+ ]
+ if not batch_size_suggestions:
+ batch_size_suggestions = [b for b in batch_size_candidates if b <= n_steps]
+ batch_size = trial.suggest_categorical("batch_size", batch_size_suggestions)
+ return convert_optuna_params_to_model_params(
+ "PPO",
+ {
+ "n_steps": n_steps,
+ "batch_size": batch_size,
+ "gamma": trial.suggest_categorical(
+ "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
+ ),
+ "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
+ "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True),
+ "clip_range": trial.suggest_categorical("clip_range", [0.1, 0.2, 0.3]),
+ "n_epochs": trial.suggest_categorical("n_epochs", [1, 2, 3, 4, 5]),
+ "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.98, step=0.01),
+ "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05),
+ "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05),
+ "lr_schedule": trial.suggest_categorical(
+ "lr_schedule", ["linear", "constant"]
+ ),
+ "cr_schedule": trial.suggest_categorical(
+ "cr_schedule", ["linear", "constant"]
+ ),
+ "target_kl": trial.suggest_categorical(
+ "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04]
+ ),
+ "ortho_init": trial.suggest_categorical("ortho_init", [False, True]),
+ "net_arch": trial.suggest_categorical(
+ "net_arch", ["small", "medium", "large", "extra_large"]
+ ),
+ "activation_fn": trial.suggest_categorical(
+ "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
+ ),
+ "optimizer_class": trial.suggest_categorical("optimizer_class", ["adam"]),
+ },
)
- activation_fn = get_activation_fn(activation_fn_name)
- optimizer_class_name = trial.suggest_categorical("optimizer_class", ["adam"])
- optimizer_class = get_optimizer_class(optimizer_class_name)
- return {
- "n_steps": n_steps,
- "batch_size": batch_size,
- "gamma": gamma,
- "learning_rate": learning_rate,
- "ent_coef": ent_coef,
- "clip_range": clip_range,
- "n_epochs": n_epochs,
- "gae_lambda": gae_lambda,
- "max_grad_norm": max_grad_norm,
- "vf_coef": vf_coef,
- "policy_kwargs": dict(
- net_arch=net_arch,
- activation_fn=activation_fn,
- optimizer_class=optimizer_class,
- ortho_init=ortho_init,
- ),
- }
-def sample_params_dqn(trial: Trial) -> Dict[str, Any]:
- """
- Sampler for DQN hyperparams
- """
- gamma = trial.suggest_categorical(
- "gamma", [0.9, 0.95, 0.98, 0.99, 0.995, 0.999, 0.9999]
- )
- batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512])
- learning_rate = trial.suggest_float("learning_rate", 1e-6, 0.01, log=True)
- lr_schedule = trial.suggest_categorical("lr_schedule", ["linear", "constant"])
- if lr_schedule == "linear":
- learning_rate = linear_schedule(learning_rate)
- buffer_size = trial.suggest_categorical(
- "buffer_size", [int(1e4), int(5e4), int(1e5)]
+def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
+ train_freq = trial.suggest_categorical(
+ "train_freq", [2, 4, 8, 16, 128, 256, 512, 1024]
)
exploration_final_eps = trial.suggest_float(
- "exploration_final_eps", 0, 0.2, step=0.1
+ "exploration_final_eps", 0.01, 0.2, step=0.01
)
- exploration_fraction = trial.suggest_float("exploration_fraction", 0, 0.5, step=0.1)
- target_update_interval = trial.suggest_categorical(
- "target_update_interval", [1000, 5000, 10000]
+ exploration_initial_eps = trial.suggest_float(
+ "exploration_initial_eps", exploration_final_eps, 1.0
)
- learning_starts = trial.suggest_categorical("learning_starts", [1000, 5000, 10000])
- train_freq = trial.suggest_categorical(
- "train_freq", [2, 4, 8, 16, 128, 256, 512, 1024]
+ if exploration_initial_eps >= 0.9:
+ min_fraction = 0.2
+ elif (exploration_initial_eps - exploration_final_eps) > 0.5:
+ min_fraction = 0.15
+ else:
+ min_fraction = 0.05
+ exploration_fraction = trial.suggest_float(
+ "exploration_fraction", min_fraction, 0.9, step=0.02
)
- subsample_steps = trial.suggest_categorical("subsample_steps", [2, 4, 8])
- gradient_steps = max(train_freq // subsample_steps, 1)
- net_arch_type: str = trial.suggest_categorical(
- "net_arch", ["small", "medium", "large", "extra_large"]
+
+ buffer_size = trial.suggest_categorical(
+ "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)]
)
- net_arch = get_net_arch("DQN", net_arch_type)
- activation_fn_name: str = trial.suggest_categorical(
- "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
+ learning_starts_suggestions = [
+ v
+ for v in [500, 1000, 2000, 3000, 4000, 5000]
+ if v <= min(int(buffer_size * 0.05), 5000)
+ ]
+ if not learning_starts_suggestions:
+ learning_starts_suggestions = [1000]
+ learning_starts = trial.suggest_categorical(
+ "learning_starts", learning_starts_suggestions
)
- activation_fn = get_activation_fn(activation_fn_name)
- optimizer_class_name = trial.suggest_categorical("optimizer_class", ["adam"])
- optimizer_class = get_optimizer_class(optimizer_class_name)
return {
- "gamma": gamma,
- "batch_size": batch_size,
- "learning_rate": learning_rate,
- "buffer_size": buffer_size,
"train_freq": train_freq,
- "gradient_steps": gradient_steps,
- "exploration_fraction": exploration_fraction,
+ "gradient_steps": max(
+ train_freq // trial.suggest_categorical("subsample_steps", [2, 4, 8]), 1
+ ),
+ "gamma": trial.suggest_categorical(
+ "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
+ ),
+ "batch_size": trial.suggest_categorical(
+ "batch_size", [64, 128, 256, 512, 1024]
+ ),
+ "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
+ "lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]),
+ "buffer_size": buffer_size,
+ "exploration_initial_eps": exploration_initial_eps,
"exploration_final_eps": exploration_final_eps,
- "target_update_interval": target_update_interval,
+ "exploration_fraction": exploration_fraction,
+ "target_update_interval": trial.suggest_categorical(
+ "target_update_interval", [1000, 2000, 5000, 7500, 10000]
+ ),
"learning_starts": learning_starts,
- "policy_kwargs": dict(
- net_arch=net_arch,
- activation_fn=activation_fn,
- optimizer_class=optimizer_class,
+ "net_arch": trial.suggest_categorical(
+ "net_arch", ["small", "medium", "large", "extra_large"]
),
+ "activation_fn": trial.suggest_categorical(
+ "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
+ ),
+ "optimizer_class": trial.suggest_categorical("optimizer_class", ["adam"]),
}
+def sample_params_dqn(trial: Trial) -> Dict[str, Any]:
+ """
+ Sampler for DQN hyperparams
+ """
+ return convert_optuna_params_to_model_params(
+ "DQN", get_common_dqn_optuna_params(trial)
+ )
+
+
def sample_params_qrdqn(trial: Trial) -> Dict[str, Any]:
"""
Sampler for QRDQN hyperparams
"""
- params = sample_params_dqn(trial)
- n_quantiles = trial.suggest_int("n_quantiles", 5, 200)
- params["policy_kwargs"].update({"n_quantiles": n_quantiles})
- return params
+ dqn_optuna_params = get_common_dqn_optuna_params(trial)
+ dqn_optuna_params.update({"n_quantiles": trial.suggest_int("n_quantiles", 10, 128)})
+ return convert_optuna_params_to_model_params("QRDQN", dqn_optuna_params)