self.n_eval_episodes,
)
self.n_eval_episodes = 5
+ tensorboard_throttle = self.rl_config.get("tensorboard_throttle", 1)
+ if not isinstance(tensorboard_throttle, int) or tensorboard_throttle < 1:
+ logger.warning(
+ "Invalid tensorboard_throttle=%s. Forcing tensorboard_throttle=1",
+ tensorboard_throttle,
+ )
+ self.rl_config["tensorboard_throttle"] = 1
if self.continual_learning and self.frame_stacking:
logger.warning(
"User tried to use continual_learning with frame_stacking. \
if total_timesteps <= 0:
return 1
if "PPO" in self.model_type:
- eval_freq = None
+ eval_freq: Optional[int] = None
if model_params:
n_steps = model_params.get("n_steps")
if isinstance(n_steps, int) and n_steps > 0:
PPO_N_STEPS[0],
)
else:
- if hyperopt and hyperopt_reduction_factor > 1.0:
- eval_freq = int(self.n_eval_steps / hyperopt_reduction_factor)
- else:
- eval_freq = self.n_eval_steps
- eval_freq = max(1, (eval_freq + self.n_envs - 1) // self.n_envs)
+ eval_freq = max(1, (self.n_eval_steps + self.n_envs - 1) // self.n_envs)
+
+ if hyperopt and hyperopt_reduction_factor > 1.0:
+ eval_freq = max(1, int(round(eval_freq / hyperopt_reduction_factor)))
return min(eval_freq, total_timesteps)
total_days,
)
logger.info(
- "Eval: %s steps (%s days), %s env(s)",
+ "Eval: %s steps (%s days), %s episodes, %s env(s)",
eval_timesteps,
eval_days,
+ self.n_eval_episodes,
self.n_eval_envs,
)
logger.info("Multiprocessing: %s", self.multiprocessing)
"""
virtual_position: Positions = Positions.Neutral
+ np_dataframe: NDArray[np.float32] = dataframe.to_numpy(
+ dtype=np.float32, copy=False
+ )
+ n = int(np_dataframe.shape[0])
+ window_length = int(self.CONV_WIDTH)
+ if self.rl_config.get("add_state_info", False) and not self.live:
+ static_state_block = np.tile(
+ np.array([0.0, float(Positions.Neutral.value), 0.0], dtype=np.float32),
+ (window_length, 1),
+ )
def _update_virtual_position(action: int, position: Positions) -> Positions:
if action == Actions.Long_enter.value and position == Positions.Neutral:
return position
frame_buffer: List[NDArray[np.float32]] = []
+ zero_frame: Optional[NDArray[np.float32]] = None
- def _predict(window) -> int:
- observation: DataFrame = dataframe.iloc[window.index]
+ def _predict(start_idx: int) -> int:
+ nonlocal zero_frame
+ end_idx = start_idx + window_length
+ np_observation = np_dataframe[start_idx:end_idx, :]
action_masks_param: Dict[str, Any] = {}
if self.rl_config.get("add_state_info", False):
if self.live:
position, pnl, trade_duration = self.get_state_info(dk.pair)
+ position = ReforceXY._normalize_position(position)
+ state_block = np.tile(
+ np.array(
+ [float(pnl), float(position.value), float(trade_duration)],
+ dtype=np.float32,
+ ),
+ (np_observation.shape[0], 1),
+ )
else:
- position = Positions.Neutral
- pnl = 0.0
- trade_duration = 0
-
- # STATE_INFO
- observation["pnl"] = pnl
- observation["position"] = position
- observation["trade_duration"] = trade_duration
-
- np_observation = observation.to_numpy(dtype=np.float32)
+ state_block = static_state_block
+ np_observation = np.concatenate([np_observation, state_block], axis=1)
fb: List[NDArray[np.float32]] = frame_buffer
frame_stacking = self.frame_stacking
if frame_stacking and frame_stacking > 1:
- fb.append(np_observation.copy())
+ fb.append(np_observation)
if len(fb) > frame_stacking:
del fb[0 : len(fb) - frame_stacking]
if len(fb) < frame_stacking:
pad_count = frame_stacking - len(fb)
- pad_frame = np.zeros_like(np_observation, dtype=np.float32)
- fb_padded = [pad_frame] * pad_count + fb
+ if zero_frame is None:
+ zero_frame = np.zeros_like(np_observation, dtype=np.float32)
+ fb_padded = [zero_frame] * pad_count + fb
else:
fb_padded = fb
stacked_observations = np.concatenate(fb_padded, axis=1)
return int(action)
predicted_actions: List[int] = []
- for window_end in range(self.CONV_WIDTH, len(dataframe) + 1):
- window = dataframe.iloc[window_end - self.CONV_WIDTH : window_end]
- action = _predict(window)
+ for start_idx in range(0, n - window_length + 1):
+ action = _predict(start_idx)
predicted_actions.append(action)
virtual_position = _update_virtual_position(action, virtual_position)
- pad = [np.nan] * (self.CONV_WIDTH - 1)
- actions_list = pad + predicted_actions
+ pad_count = n - len(predicted_actions)
+ actions_list = ([np.nan] * pad_count) + predicted_actions
actions = DataFrame({"action": actions_list}, index=dataframe.index)
return DataFrame({label: actions["action"] for label in dk.label_list})
resource_eval_freq = self.get_eval_freq(total_timesteps, hyperopt=True)
reduction_factor = 3
max_resource = max(
- reduction_factor * 2, total_timesteps // (resource_eval_freq * self.n_envs)
+ reduction_factor * 2, (total_timesteps // self.n_envs) // resource_eval_freq
)
- min_resource = min(reduction_factor, max_resource // reduction_factor)
+ min_resource = max(1, max_resource // reduction_factor)
study: Study = create_study(
study_name=study_name,
sampler=TPESampler(
if "PPO" in self.model_type:
params = sample_params_ppo(trial, self.n_envs)
- if params.get("n_steps", 0) * self.n_envs > total_timesteps:
- raise TrialPruned("n_steps * n_envs is greater than total_timesteps")
+ n_steps = params.get("n_steps", 0)
+ if n_steps * self.n_envs > total_timesteps:
+ raise TrialPruned(
+ f"{n_steps=} * n_envs={self.n_envs}={n_steps * self.n_envs} is greater than {total_timesteps=}"
+ )
elif "QRDQN" in self.model_type:
params = sample_params_qrdqn(trial)
elif "DQN" in self.model_type:
if "DQN" in self.model_type:
gradient_steps = params.get("gradient_steps")
if isinstance(gradient_steps, int) and gradient_steps <= 0:
- raise TrialPruned("gradient_steps is negative or zero")
+ raise TrialPruned(f"{gradient_steps=} is negative or zero")
batch_size = params.get("batch_size")
buffer_size = params.get("buffer_size")
if (batch_size * gradient_steps) > buffer_size:
raise TrialPruned(
- "batch_size * gradient_steps is greater than buffer_size"
+ f"{batch_size=} * {gradient_steps=}={batch_size * gradient_steps} is greater than {buffer_size=}"
)
# Ensure that the sampled parameters take precedence
self._last_closed_trade_tick: int = 0
return observation, history
- def _get_exit_reward_factor(
+ def _get_exit_factor(
self,
factor: float,
pnl: float,
ForceActions.Stop_loss,
ForceActions.Timeout,
):
- return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio)
+ return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
# # you can use feature values from dataframe
# rsi_now = self.get_feature_value(
# close long
if action == Actions.Long_exit.value and self._position == Positions.Long:
- return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio)
+ return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
# close short
if action == Actions.Short_exit.value and self._position == Positions.Short:
- return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio)
+ return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
return 0.0
logger.warning("history is empty")
return DataFrame()
- _history_df = DataFrame.from_dict(self.history)
+ _history_df = DataFrame(self.history)
if "tick" not in _history_df.columns:
logger.warning("'tick' column is missing from history")
return DataFrame()
+ _rollout_history = _history_df.copy()
if self.trade_history:
- _trade_history_df = DataFrame.from_dict(self.trade_history)
+ _trade_history_df = DataFrame(self.trade_history)
if "tick" in _trade_history_df.columns:
_rollout_history = merge(
- _history_df, _trade_history_df, on="tick", how="left"
+ _rollout_history, _trade_history_df, on="tick", how="left"
)
- else:
- _rollout_history = _history_df.copy()
- else:
- _rollout_history = _history_df.copy()
try:
history = merge(
if len(history) == 0:
return fig
- plot_window = int(self.rl_config.get("plot_window", 2000))
+ plot_window = self.rl_config.get("plot_window", 2000)
if plot_window > 0 and len(history) > plot_window:
history = history.iloc[-plot_window:]
"gamma": float(self.model.gamma),
"batch_size": int(self.model.batch_size),
}
+ try:
+ n_updates = getattr(self.model, "n_updates", None)
+ if n_updates is None:
+ n_updates = getattr(self.model, "_n_updates", None)
+ if isinstance(n_updates, (int, float)) and np.isfinite(n_updates):
+ hparam_dict.update({"n_updates": int(n_updates)})
+ except Exception:
+ pass
if "PPO" in self.model.__class__.__name__:
cr = getattr(self.model, "clip_range", None)
cr_schedule, cr_iv, cr_fv = get_schedule_type(cr)
if self.throttle > 1 and (self.num_timesteps % self.throttle) != 0:
return True
+ logger_exclude = ("stdout", "log", "json", "csv")
+
def _is_numeric_non_bool(x: Any) -> bool:
return isinstance(
x, (int, float, np.integer, np.floating)
else:
aggregated_info[f"{k}_mode"] = "mixed"
- logger_exclude = ("stdout", "log", "json", "csv")
self._safe_logger_record(
"info/n_envs", int(len(infos_list)), exclude=logger_exclude
)
for k, counts in cat_counts.items():
cat_total = max(1, int(cat_totals.get(k, 0)))
for name, cnt in counts.items():
+ name = str(name)
self._safe_logger_record(
f"info/{k}/{name}_count", int(cnt), exclude=logger_exclude
)
except Exception:
progress_remaining = 1.0
+ try:
+ n_updates = getattr(self.model, "n_updates", None)
+ if n_updates is None:
+ n_updates = getattr(self.model, "_n_updates", None)
+ if _is_finite_number(n_updates):
+ self._safe_logger_record(
+ "train/n_updates", float(n_updates), exclude=logger_exclude
+ )
+ except Exception:
+ pass
+
def _eval_schedule(schedule: Any) -> float | None:
schedule_type, _, _ = get_schedule_type(schedule)
try:
except Exception:
pass
+ if "DQN" in self.model.__class__.__name__:
+ try:
+ er = getattr(self.model, "exploration_rate", None)
+ if _is_finite_number(er):
+ self._safe_logger_record(
+ "train/exploration_rate", float(er), exclude=logger_exclude
+ )
+ except Exception:
+ pass
+
return True
self.eval_idx,
e,
)
- if np.isfinite(best_mean_reward):
- try:
+ try:
+ logger_exclude = ("stdout", "log", "json", "csv")
+ self.logger.record(
+ "eval/idx",
+ int(self.eval_idx),
+ exclude=logger_exclude,
+ )
+ self.logger.record(
+ "eval/last_mean_reward",
+ last_mean_reward,
+ exclude=logger_exclude,
+ )
+ if np.isfinite(best_mean_reward):
self.logger.record(
"eval/best_mean_reward",
best_mean_reward,
- exclude=("stdout", "log", "json", "csv"),
+ exclude=logger_exclude,
)
- except Exception as e:
- logger.error(
- "Optuna: logger.record failed at %r: %r",
- "eval/best_mean_reward",
- e,
+ else:
+ logger.warning(
+ "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx
)
- pass
- else:
- logger.warning(
- "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx
+ except Exception as e:
+ logger.error(
+ "Optuna: logger.record failed at eval %s: %r", self.eval_idx, e
)
try:
if self.trial.should_prune():
"""
n_steps = trial.suggest_categorical("n_steps", list(PPO_N_STEPS))
batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512, 1024])
- if batch_size > n_steps:
- raise TrialPruned("batch_size is greater than n_steps")
if (n_steps * n_envs) % batch_size != 0:
- raise TrialPruned("n_steps * n_envs is not divisible by batch_size")
+ raise TrialPruned(
+ f"{n_steps=} * {n_envs=} = {n_steps * n_envs} is not divisible by {batch_size=}"
+ )
return convert_optuna_params_to_model_params(
"PPO",
{
learning_starts = trial.suggest_categorical(
"learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000]
)
- if learning_starts >= buffer_size:
- raise TrialPruned("learning_starts is greater than or equal to buffer_size")
+ if learning_starts > buffer_size:
+ raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}")
return {
"train_freq": trial.suggest_categorical(
"train_freq", [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]