_STORAGE_BACKENDS: Final[Tuple[StorageBackend, ...]] = ("sqlite", "file")
_SAMPLER_TYPES: Final[Tuple[SamplerType, ...]] = ("tpe", "auto")
_PPO_N_STEPS: Final[Tuple[int, ...]] = (512, 1024, 2048, 4096)
+ _PPO_N_STEPS_MIN: Final[int] = min(_PPO_N_STEPS)
+ _PPO_N_STEPS_MAX: Final[int] = max(_PPO_N_STEPS)
+ _HYPEROPT_EVAL_FREQ_REDUCTION_FACTOR: Final[float] = 4.0
_action_masks_cache: ClassVar[Dict[Tuple[bool, float], NDArray[np.bool_]]] = {}
self,
total_timesteps: int,
hyperopt: bool = False,
- hyperopt_reduction_factor: float = 4.0,
+ hyperopt_reduction_factor: float = _HYPEROPT_EVAL_FREQ_REDUCTION_FACTOR,
model_params: Optional[Dict[str, Any]] = None,
) -> int:
- """
- Calculate evaluation frequency (number of steps between evaluations).
-
- For PPO:
- - Use n_steps from model_params if available
- - Otherwise, select the largest value from ReforceXY._PPO_N_STEPS that is <= total_timesteps
-
- For DQN:
- - Use n_eval_steps divided by n_envs (rounded up)
+ """Calculate evaluation frequency.
- For hyperopt:
- - Reduce eval_freq by hyperopt_reduction_factor to speed up trials
+ Stable-Baselines3 triggers evaluation every `eval_freq` callback calls (`n_calls`). With
+ `n_envs > 1`, one callback call corresponds to `n_envs` timesteps, so this method works in
+ `n_calls` units.
- Args:
- total_timesteps: Total training timesteps
- hyperopt: If True, reduce eval_freq for hyperopt
- hyperopt_reduction_factor: Reduction factor for hyperopt (default: 4.0)
- model_params: Model parameters (to get n_steps for PPO)
+ - PPO: prefer `n_steps`.
+ - DQN: use `ceil(n_eval_steps / n_envs)`.
+ - Hyperopt: optionally reduce interval.
Returns:
- int: Evaluation frequency (capped at total_timesteps)
+ Evaluation frequency in callback calls (`n_calls`).
"""
if total_timesteps <= 0:
return 1
+
+ n_envs = self.n_envs
+ max_n_calls = max(1, total_timesteps // n_envs)
+
# "PPO"
if ReforceXY._MODEL_TYPES[0] in self.model_type:
eval_freq: Optional[int] = None
if model_params:
n_steps = model_params.get("n_steps")
if isinstance(n_steps, int) and n_steps > 0:
- eval_freq = max(1, n_steps)
+ eval_freq = max(1, min(n_steps, max_n_calls))
if eval_freq is None:
eval_freq = next(
(
step
for step in sorted(ReforceXY._PPO_N_STEPS, reverse=True)
- if step <= total_timesteps
+ if step <= max_n_calls
),
- ReforceXY._PPO_N_STEPS[0],
+ max_n_calls,
)
else:
- eval_freq = max(1, (self.n_eval_steps + self.n_envs - 1) // self.n_envs)
+ eval_freq = max(1, (self.n_eval_steps + n_envs - 1) // n_envs)
if hyperopt and hyperopt_reduction_factor > 1.0:
eval_freq = max(1, int(round(eval_freq / hyperopt_reduction_factor)))
- return min(eval_freq, total_timesteps)
+ return min(eval_freq, max_n_calls)
def get_callbacks(
self,
test_df = data_dictionary.get("test_features")
eval_timesteps = len(test_df)
train_cycles = max(1, int(self.rl_config.get("train_cycles", 25)))
- total_timesteps = (
- (train_timesteps * train_cycles + self.n_envs - 1) // self.n_envs
- ) * self.n_envs
+ total_timesteps = ReforceXY._ceil_to_multiple(
+ train_timesteps * train_cycles, self.n_envs
+ )
train_days = steps_to_days(train_timesteps, self.config.get("timeframe"))
eval_days = steps_to_days(eval_timesteps, self.config.get("timeframe"))
total_days = steps_to_days(total_timesteps, self.config.get("timeframe"))
)
if n_steps > 0:
rollout = n_steps * self.n_envs
- aligned_total_timesteps = (
- (total_timesteps + rollout - 1) // rollout
- ) * rollout
+ aligned_total_timesteps = ReforceXY._ceil_to_multiple(
+ total_timesteps, rollout
+ )
if aligned_total_timesteps != total_timesteps:
total_timesteps = aligned_total_timesteps
logger.info(
reduction_factor=reduction_factor,
)
+ @staticmethod
+ def _ceil_to_multiple(value: int, multiple: int) -> int:
+ return ((value + multiple - 1) // multiple) * multiple
+
+ @staticmethod
+ def _ppo_resources(total_timesteps: int, n_envs: int) -> Tuple[int, int]:
+ min_n_steps = ReforceXY._PPO_N_STEPS_MIN
+ max_n_steps = ReforceXY._PPO_N_STEPS_MAX
+ min_resource = (
+ max(
+ 1,
+ round(min_n_steps / ReforceXY._HYPEROPT_EVAL_FREQ_REDUCTION_FACTOR),
+ )
+ * n_envs
+ )
+ rollout = max_n_steps * n_envs
+ return (
+ min_resource,
+ max(min_resource, ReforceXY._ceil_to_multiple(total_timesteps, rollout)),
+ )
+
def optimize(
self, dk: FreqaiDataKitchen, total_timesteps: int
) -> Optional[Dict[str, Any]]:
continuous = self.rl_config_optuna.get("continuous", False)
if continuous:
ReforceXY.delete_study(study_name, storage)
- # "PPO"
- if ReforceXY._MODEL_TYPES[0] in self.model_type:
- resource_eval_freq = min(ReforceXY._PPO_N_STEPS)
- else:
- resource_eval_freq = self.get_eval_freq(total_timesteps, hyperopt=True)
+
reduction_factor = 3
- max_resource = max(
- reduction_factor * 2, (total_timesteps // self.n_envs) // resource_eval_freq
- )
- min_resource = max(1, max_resource // reduction_factor)
+ n_envs = self.n_envs
+ if ReforceXY._MODEL_TYPES[0] in self.model_type: # "PPO"
+ min_resource, max_resource = ReforceXY._ppo_resources(
+ total_timesteps, n_envs
+ )
+ else:
+ min_resource = self.get_eval_freq(total_timesteps, hyperopt=True) * n_envs
+ max_resource = max(min_resource, total_timesteps + (n_envs - 1))
study: Study = create_study(
study_name=study_name,
n_steps = params.get("n_steps", 0)
if n_steps > 0:
rollout = n_steps * self.n_envs
- aligned_total_timesteps = (
- (total_timesteps + rollout - 1) // rollout
- ) * rollout
+ aligned_total_timesteps = ReforceXY._ceil_to_multiple(
+ total_timesteps, rollout
+ )
if aligned_total_timesteps != total_timesteps:
total_timesteps = aligned_total_timesteps
"duration_ratio": (
trade_duration / max(1, self.max_trade_duration_candles)
),
- "trade_count": int(len(self.trade_history) // 2),
+ "trade_count": len(self.trade_history) // 2,
}
self._update_history(info)
terminated = self.is_terminated()
fig.suptitle(
f"Total Reward: {self.total_reward:.2f} ~ "
+ f"Total Profit: {self._total_profit:.2f} ~ "
- + f"Trades: {int(len(self.trade_history) // 2)}",
+ + f"Trades: {len(self.trade_history) // 2}",
)
fig.tight_layout()
return fig
aggregated_info[f"{k}_mode"] = "mixed"
self._safe_logger_record(
- "info/n_envs", int(len(infos_list)), exclude=logger_exclude
+ "info/n_envs", len(infos_list), exclude=logger_exclude
)
if filtered_values > 0:
def _on_step(self) -> bool:
if self.is_pruned:
return False
+
_super_on_step = super()._on_step()
if not _super_on_step:
return False
+
if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
self.eval_idx += 1
+
try:
last_mean_reward = float(getattr(self, "last_mean_reward", np.nan))
except Exception as e:
logger.warning(
- "Optuna: invalid last_mean_reward at eval %s: %r", self.eval_idx, e
+ "Optuna: invalid last_mean_reward (eval_idx=%s, timesteps=%s): %r",
+ self.eval_idx,
+ self.num_timesteps,
+ e,
)
self.is_pruned = True
return False
+
if not np.isfinite(last_mean_reward):
logger.warning(
- "Optuna: non-finite last_mean_reward at eval %s", self.eval_idx
+ "Optuna: non-finite last_mean_reward (eval_idx=%s, timesteps=%s)",
+ self.eval_idx,
+ self.num_timesteps,
)
self.is_pruned = True
return False
+
try:
- self.trial.report(last_mean_reward, self.eval_idx)
+ self.trial.report(last_mean_reward, self.num_timesteps)
except Exception as e:
logger.warning(
- "Optuna: trial.report failed at eval %s: %r", self.eval_idx, e
+ "Optuna: trial.report failed (eval_idx=%s, timesteps=%s): %r",
+ self.eval_idx,
+ self.num_timesteps,
+ e,
)
self.is_pruned = True
return False
+
try:
best_mean_reward = float(getattr(self, "best_mean_reward", np.nan))
except Exception as e:
logger.warning(
- "Optuna: invalid best_mean_reward at eval %s: %r",
+ "Optuna: invalid best_mean_reward (eval_idx=%s, timesteps=%s): %r",
self.eval_idx,
+ self.num_timesteps,
e,
)
+ best_mean_reward = np.nan
+
try:
logger_exclude = ("stdout", "log", "json", "csv")
+ self.logger.record("eval/idx", self.eval_idx, exclude=logger_exclude)
self.logger.record(
- "eval/idx",
- int(self.eval_idx),
- exclude=logger_exclude,
+ "eval/num_timesteps", self.num_timesteps, exclude=logger_exclude
)
self.logger.record(
- "eval/last_mean_reward",
- last_mean_reward,
- exclude=logger_exclude,
+ "eval/last_mean_reward", last_mean_reward, exclude=logger_exclude
)
if np.isfinite(best_mean_reward):
self.logger.record(
)
else:
logger.warning(
- "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx
+ "Optuna: non-finite best_mean_reward (eval_idx=%s, timesteps=%s)",
+ self.eval_idx,
+ self.num_timesteps,
)
except Exception as e:
logger.error(
- "Optuna: logger.record failed at eval %s: %r", self.eval_idx, e
+ "Optuna: logger.record failed (eval_idx=%s, timesteps=%s): %r",
+ self.eval_idx,
+ self.num_timesteps,
+ e,
)
+
try:
if self.trial.should_prune():
logger.info(
- "Optuna: trial pruned at eval %s (score=%.5f)",
+ "Optuna: trial pruned (eval_idx=%s, timesteps=%s, score=%.5f)",
self.eval_idx,
+ self.num_timesteps,
last_mean_reward,
)
self.is_pruned = True
return False
except Exception as e:
logger.warning(
- "Optuna: trial.should_prune failed at eval %s: %r", self.eval_idx, e
+ "Optuna: trial.should_prune failed (eval_idx=%s, timesteps=%s): %r",
+ self.eval_idx,
+ self.num_timesteps,
+ e,
)
self.is_pruned = True
return False