model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters)
if self.lr_schedule:
- lr = model_params.get("learning_rate", 0.0003)
+ lr = float(model_params.get("learning_rate", 0.0003))
model_params["learning_rate"] = linear_schedule(lr)
logger.info("Learning rate linear schedule enabled, initial value: %s", lr)
if "PPO" in self.model_type and self.cr_schedule:
- cr = model_params.get("clip_range", 0.2)
+ cr = float(model_params.get("clip_range", 0.2))
model_params["clip_range"] = linear_schedule(cr)
logger.info("Clip range linear schedule enabled, initial value: %s", cr)
+ if "DQN" in self.model_type:
+ gradient_steps = int(model_params.get("gradient_steps"))
+ if gradient_steps is None:
+ train_freq = model_params.get("train_freq")
+ if isinstance(train_freq, (tuple, list)) and train_freq:
+ train_freq = (
+ train_freq[0] if isinstance(train_freq[0], int) else None
+ )
+ else:
+ train_freq = train_freq if isinstance(train_freq, int) else None
+ subsample_steps = model_params.get("subsample_steps")
+ if (
+ isinstance(train_freq, int)
+ and train_freq > 0
+ and isinstance(subsample_steps, int)
+ and subsample_steps > 0
+ ):
+ model_params["gradient_steps"] = min(
+ train_freq, max(train_freq // subsample_steps, 1)
+ )
+ else:
+ model_params["gradient_steps"] = 1
+
if not model_params.get("policy_kwargs"):
model_params["policy_kwargs"] = {}
else:
model_params["policy_kwargs"]["net_arch"] = net_arch
else:
- model_params["policy_kwargs"]["net_arch"] = net_arch
+ if isinstance(net_arch, str):
+ model_params["policy_kwargs"]["net_arch"] = get_net_arch(
+ self.model_type, net_arch
+ )
+ else:
+ model_params["policy_kwargs"]["net_arch"] = net_arch
model_params["policy_kwargs"]["activation_fn"] = get_activation_fn(
model_params.get("policy_kwargs", {}).get("activation_fn", "relu")
model_params["policy_kwargs"]["optimizer_class"] = get_optimizer_class(
model_params.get("policy_kwargs", {}).get("optimizer_class", "adam")
)
+
self._model_params_cache = copy.deepcopy(model_params)
return copy.deepcopy(self._model_params_cache)
def _on_training_start(self) -> None:
_lr = self.model.learning_rate
- _lr = _lr if isinstance(_lr, float) else "lr_schedule"
+ _lr = (
+ float(_lr) if isinstance(_lr, (int, float, np.floating)) else "lr_schedule"
+ )
hparam_dict: Dict[str, Any] = {
"algorithm": self.model.__class__.__name__,
"learning_rate": _lr,
- "gamma": self.model.gamma,
- "batch_size": self.model.batch_size,
+ "gamma": float(self.model.gamma),
+ "batch_size": int(self.model.batch_size),
}
if "PPO" in self.model.__class__.__name__:
_cr = self.model.clip_range
- _cr = _cr if isinstance(_cr, float) else "cr_schedule"
+ _cr = (
+ float(_cr)
+ if isinstance(_cr, (int, float, np.floating))
+ else "cr_schedule"
+ )
hparam_dict.update(
{
"clip_range": _cr,
- "gae_lambda": self.model.gae_lambda,
- "n_steps": self.model.n_steps,
- "n_epochs": self.model.n_epochs,
- "ent_coef": self.model.ent_coef,
- "vf_coef": self.model.vf_coef,
- "max_grad_norm": self.model.max_grad_norm,
+ "gae_lambda": float(self.model.gae_lambda),
+ "n_steps": int(self.model.n_steps),
+ "n_epochs": int(self.model.n_epochs),
+ "ent_coef": float(self.model.ent_coef),
+ "vf_coef": float(self.model.vf_coef),
+ "max_grad_norm": float(self.model.max_grad_norm),
}
)
if getattr(self.model, "target_kl", None) is not None:
- hparam_dict["target_kl"] = self.model.target_kl
+ hparam_dict["target_kl"] = float(self.model.target_kl)
if "DQN" in self.model.__class__.__name__:
hparam_dict.update(
{
- "buffer_size": self.model.buffer_size,
- "gradient_steps": self.model.gradient_steps,
- "learning_starts": self.model.learning_starts,
- "target_update_interval": self.model.target_update_interval,
- "exploration_initial_eps": self.model.exploration_initial_eps,
- "exploration_final_eps": self.model.exploration_final_eps,
- "exploration_fraction": self.model.exploration_fraction,
- "exploration_rate": self.model.exploration_rate,
+ "buffer_size": int(self.model.buffer_size),
+ "gradient_steps": int(self.model.gradient_steps),
+ "learning_starts": int(self.model.learning_starts),
+ "target_update_interval": int(self.model.target_update_interval),
+ "exploration_initial_eps": float(
+ self.model.exploration_initial_eps
+ ),
+ "exploration_final_eps": float(self.model.exploration_final_eps),
+ "exploration_fraction": float(self.model.exploration_fraction),
+ "exploration_rate": float(self.model.exploration_rate),
}
)
if "QRDQN" in self.model.__class__.__name__:
- hparam_dict.update({"n_quantiles": self.model.n_quantiles})
+ hparam_dict.update({"n_quantiles": int(self.model.n_quantiles)})
metric_dict = {
"info/total_reward": 0.0,
"info/total_profit": 1.0,
for k, values in numeric_acc.items():
if not values:
continue
- mean_v = sum(values) / len(values)
- aggregated_info[k] = mean_v
+ values_mean = sum(values) / len(values)
+ aggregated_info[k] = values_mean
if len(values) > 1:
try:
aggregated_info[f"{k}_std"] = stdev(values)
except Exception:
pass
+ for key in ("reward", "pnl"):
+ values = numeric_acc.get(key, [])
+ if values:
+ try:
+ aggregated_info[f"{key}_min"] = float(min(values))
+ aggregated_info[f"{key}_max"] = float(max(values))
+ except Exception:
+ pass
for k, values in non_numeric_acc.items():
if len(values) == 1:
aggregated_info[k] = next(iter(values))
if optuna_params.get("target_kl") is not None:
model_params["target_kl"] = optuna_params.get("target_kl")
elif "DQN" in model_type:
+ train_freq = optuna_params.get("train_freq")
+ subsample_steps = optuna_params.get("subsample_steps")
model_params.update(
{
"gamma": optuna_params.get("gamma"),
"batch_size": optuna_params.get("batch_size"),
"learning_rate": lr,
"buffer_size": optuna_params.get("buffer_size"),
- "train_freq": optuna_params.get("train_freq"),
- "gradient_steps": optuna_params.get("gradient_steps"),
+ "train_freq": train_freq,
+ "gradient_steps": min(
+ train_freq,
+ max(
+ train_freq // subsample_steps,
+ 1,
+ ),
+ ),
"exploration_fraction": optuna_params.get("exploration_fraction"),
"exploration_initial_eps": optuna_params.get("exploration_initial_eps"),
"exploration_final_eps": optuna_params.get("exploration_final_eps"),
def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
- train_freq = trial.suggest_categorical(
- "train_freq", [2, 4, 8, 16, 128, 256, 512, 1024]
- )
exploration_final_eps = trial.suggest_float(
"exploration_final_eps", 0.01, 0.2, step=0.01
)
if learning_starts >= buffer_size:
raise TrialPruned("learning_starts must be less than buffer_size")
return {
- "train_freq": train_freq,
- "gradient_steps": min(
- train_freq,
- max(
- train_freq // trial.suggest_categorical("subsample_steps", [2, 4, 8]),
- 1,
- ),
+ "train_freq": trial.suggest_categorical(
+ "train_freq", [2, 4, 8, 16, 128, 256, 512, 1024]
),
+ "subsample_steps": trial.suggest_categorical("subsample_steps", [2, 4, 8]),
"gamma": trial.suggest_categorical(
"gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
),
Sampler for QRDQN hyperparams
"""
dqn_optuna_params = get_common_dqn_optuna_params(trial)
- dqn_optuna_params.update({"n_quantiles": trial.suggest_int("n_quantiles", 10, 128)})
+ dqn_optuna_params.update({"n_quantiles": trial.suggest_int("n_quantiles", 10, 160)})
return convert_optuna_params_to_model_params("QRDQN", dqn_optuna_params)
win_type: Literal["gaussian", "kaiser", "triang"],
std: float,
beta: float,
-) -> NDArray[np.float64]:
+) -> NDArray[np.floating]:
if win_type == "gaussian":
coeffs = sp.signal.windows.gaussian(M=window, std=std, sym=True)
elif win_type == "kaiser":
@lru_cache(maxsize=8)
def get_ma_fn(
mamode: str,
-) -> Callable[[pd.Series | NDArray[np.float64], int], pd.Series | NDArray[np.float64]]:
+) -> Callable[
+ [pd.Series | NDArray[np.floating], int], pd.Series | NDArray[np.floating]
+]:
mamodes: dict[
str,
Callable[
- [pd.Series | NDArray[np.float64], int], pd.Series | NDArray[np.float64]
+ [pd.Series | NDArray[np.floating], int], pd.Series | NDArray[np.floating]
],
] = {
"sma": ta.SMA,
@lru_cache(maxsize=8)
def get_zl_ma_fn(
mamode: str,
-) -> Callable[[pd.Series | NDArray[np.float64], int], pd.Series | NDArray[np.float64]]:
+) -> Callable[
+ [pd.Series | NDArray[np.floating], int], pd.Series | NDArray[np.floating]
+]:
ma_fn = get_ma_fn(mamode)
return lambda series, timeperiod: ma_fn(
calculate_zero_lag(series, timeperiod), timeperiod=timeperiod
def _fractal_dimension(
- highs: NDArray[np.float64], lows: NDArray[np.float64], period: int
+ highs: NDArray[np.floating], lows: NDArray[np.floating], period: int
) -> float:
"""Original fractal dimension computation implementation per Ehlers' paper."""
if period % 2 != 0:
return fractal_highs, fractal_lows
-def calculate_quantile(values: NDArray[np.float64], value: float) -> float:
+def calculate_quantile(values: NDArray[np.floating], value: float) -> float:
if values.size == 0:
return np.nan
natr_values = (ta.NATR(df, timeperiod=natr_period).bfill() / 100.0).to_numpy()
indices: list[int] = df.index.tolist()
- thresholds: NDArray[np.float64] = natr_values * natr_ratio
+ thresholds: NDArray[np.floating] = natr_values * natr_ratio
closes = df.get("close").to_numpy()
highs = df.get("high").to_numpy()
lows = df.get("low").to_numpy()
regressor: str,
X: pd.DataFrame,
y: pd.DataFrame,
- train_weights: NDArray[np.float64],
+ train_weights: NDArray[np.floating],
eval_set: Optional[list[tuple[pd.DataFrame, pd.DataFrame]]],
- eval_weights: Optional[list[NDArray[np.float64]]],
+ eval_weights: Optional[list[NDArray[np.floating]]],
model_training_parameters: dict[str, Any],
init_model: Any = None,
callbacks: Optional[list[Callable]] = None,