From 7c24fcea4f97566b20cb59ae1627c1e44ea9de08 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 28 Sep 2025 16:00:21 +0200 Subject: [PATCH] perf(reforcexy): optimize prediction code path MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ReforceXY/user_data/freqaimodels/ReforceXY.py | 185 ++++++++++++------ 1 file changed, 122 insertions(+), 63 deletions(-) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index a79f1e4..b3c3a6e 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -287,6 +287,13 @@ class ReforceXY(BaseReinforcementLearningModel): self.n_eval_episodes, ) self.n_eval_episodes = 5 + tensorboard_throttle = self.rl_config.get("tensorboard_throttle", 1) + if not isinstance(tensorboard_throttle, int) or tensorboard_throttle < 1: + logger.warning( + "Invalid tensorboard_throttle=%s. Forcing tensorboard_throttle=1", + tensorboard_throttle, + ) + self.rl_config["tensorboard_throttle"] = 1 if self.continual_learning and self.frame_stacking: logger.warning( "User tried to use continual_learning with frame_stacking. \ @@ -456,7 +463,7 @@ class ReforceXY(BaseReinforcementLearningModel): if total_timesteps <= 0: return 1 if "PPO" in self.model_type: - eval_freq = None + eval_freq: Optional[int] = None if model_params: n_steps = model_params.get("n_steps") if isinstance(n_steps, int) and n_steps > 0: @@ -471,11 +478,10 @@ class ReforceXY(BaseReinforcementLearningModel): PPO_N_STEPS[0], ) else: - if hyperopt and hyperopt_reduction_factor > 1.0: - eval_freq = int(self.n_eval_steps / hyperopt_reduction_factor) - else: - eval_freq = self.n_eval_steps - eval_freq = max(1, (eval_freq + self.n_envs - 1) // self.n_envs) + eval_freq = max(1, (self.n_eval_steps + self.n_envs - 1) // self.n_envs) + + if hyperopt and hyperopt_reduction_factor > 1.0: + eval_freq = max(1, int(round(eval_freq / hyperopt_reduction_factor))) return min(eval_freq, total_timesteps) @@ -581,9 +587,10 @@ class ReforceXY(BaseReinforcementLearningModel): total_days, ) logger.info( - "Eval: %s steps (%s days), %s env(s)", + "Eval: %s steps (%s days), %s episodes, %s env(s)", eval_timesteps, eval_days, + self.n_eval_episodes, self.n_eval_envs, ) logger.info("Multiprocessing: %s", self.multiprocessing) @@ -694,6 +701,16 @@ class ReforceXY(BaseReinforcementLearningModel): """ virtual_position: Positions = Positions.Neutral + np_dataframe: NDArray[np.float32] = dataframe.to_numpy( + dtype=np.float32, copy=False + ) + n = int(np_dataframe.shape[0]) + window_length = int(self.CONV_WIDTH) + if self.rl_config.get("add_state_info", False) and not self.live: + static_state_block = np.tile( + np.array([0.0, float(Positions.Neutral.value), 0.0], dtype=np.float32), + (window_length, 1), + ) def _update_virtual_position(action: int, position: Positions) -> Positions: if action == Actions.Long_enter.value and position == Positions.Neutral: @@ -707,36 +724,40 @@ class ReforceXY(BaseReinforcementLearningModel): return position frame_buffer: List[NDArray[np.float32]] = [] + zero_frame: Optional[NDArray[np.float32]] = None - def _predict(window) -> int: - observation: DataFrame = dataframe.iloc[window.index] + def _predict(start_idx: int) -> int: + nonlocal zero_frame + end_idx = start_idx + window_length + np_observation = np_dataframe[start_idx:end_idx, :] action_masks_param: Dict[str, Any] = {} if self.rl_config.get("add_state_info", False): if self.live: position, pnl, trade_duration = self.get_state_info(dk.pair) + position = ReforceXY._normalize_position(position) + state_block = np.tile( + np.array( + [float(pnl), float(position.value), float(trade_duration)], + dtype=np.float32, + ), + (np_observation.shape[0], 1), + ) else: - position = Positions.Neutral - pnl = 0.0 - trade_duration = 0 - - # STATE_INFO - observation["pnl"] = pnl - observation["position"] = position - observation["trade_duration"] = trade_duration - - np_observation = observation.to_numpy(dtype=np.float32) + state_block = static_state_block + np_observation = np.concatenate([np_observation, state_block], axis=1) fb: List[NDArray[np.float32]] = frame_buffer frame_stacking = self.frame_stacking if frame_stacking and frame_stacking > 1: - fb.append(np_observation.copy()) + fb.append(np_observation) if len(fb) > frame_stacking: del fb[0 : len(fb) - frame_stacking] if len(fb) < frame_stacking: pad_count = frame_stacking - len(fb) - pad_frame = np.zeros_like(np_observation, dtype=np.float32) - fb_padded = [pad_frame] * pad_count + fb + if zero_frame is None: + zero_frame = np.zeros_like(np_observation, dtype=np.float32) + fb_padded = [zero_frame] * pad_count + fb else: fb_padded = fb stacked_observations = np.concatenate(fb_padded, axis=1) @@ -759,14 +780,13 @@ class ReforceXY(BaseReinforcementLearningModel): return int(action) predicted_actions: List[int] = [] - for window_end in range(self.CONV_WIDTH, len(dataframe) + 1): - window = dataframe.iloc[window_end - self.CONV_WIDTH : window_end] - action = _predict(window) + for start_idx in range(0, n - window_length + 1): + action = _predict(start_idx) predicted_actions.append(action) virtual_position = _update_virtual_position(action, virtual_position) - pad = [np.nan] * (self.CONV_WIDTH - 1) - actions_list = pad + predicted_actions + pad_count = n - len(predicted_actions) + actions_list = ([np.nan] * pad_count) + predicted_actions actions = DataFrame({"action": actions_list}, index=dataframe.index) return DataFrame({label: actions["action"] for label in dk.label_list}) @@ -827,9 +847,9 @@ class ReforceXY(BaseReinforcementLearningModel): resource_eval_freq = self.get_eval_freq(total_timesteps, hyperopt=True) reduction_factor = 3 max_resource = max( - reduction_factor * 2, total_timesteps // (resource_eval_freq * self.n_envs) + reduction_factor * 2, (total_timesteps // self.n_envs) // resource_eval_freq ) - min_resource = min(reduction_factor, max_resource // reduction_factor) + min_resource = max(1, max_resource // reduction_factor) study: Study = create_study( study_name=study_name, sampler=TPESampler( @@ -1053,8 +1073,11 @@ class ReforceXY(BaseReinforcementLearningModel): if "PPO" in self.model_type: params = sample_params_ppo(trial, self.n_envs) - if params.get("n_steps", 0) * self.n_envs > total_timesteps: - raise TrialPruned("n_steps * n_envs is greater than total_timesteps") + n_steps = params.get("n_steps", 0) + if n_steps * self.n_envs > total_timesteps: + raise TrialPruned( + f"{n_steps=} * n_envs={self.n_envs}={n_steps * self.n_envs} is greater than {total_timesteps=}" + ) elif "QRDQN" in self.model_type: params = sample_params_qrdqn(trial) elif "DQN" in self.model_type: @@ -1065,12 +1088,12 @@ class ReforceXY(BaseReinforcementLearningModel): if "DQN" in self.model_type: gradient_steps = params.get("gradient_steps") if isinstance(gradient_steps, int) and gradient_steps <= 0: - raise TrialPruned("gradient_steps is negative or zero") + raise TrialPruned(f"{gradient_steps=} is negative or zero") batch_size = params.get("batch_size") buffer_size = params.get("buffer_size") if (batch_size * gradient_steps) > buffer_size: raise TrialPruned( - "batch_size * gradient_steps is greater than buffer_size" + f"{batch_size=} * {gradient_steps=}={batch_size * gradient_steps} is greater than {buffer_size=}" ) # Ensure that the sampled parameters take precedence @@ -1275,7 +1298,7 @@ class MyRLEnv(Base5ActionRLEnv): self._last_closed_trade_tick: int = 0 return observation, history - def _get_exit_reward_factor( + def _get_exit_factor( self, factor: float, pnl: float, @@ -1423,7 +1446,7 @@ class MyRLEnv(Base5ActionRLEnv): ForceActions.Stop_loss, ForceActions.Timeout, ): - return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio) + return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) # # you can use feature values from dataframe # rsi_now = self.get_feature_value( @@ -1507,11 +1530,11 @@ class MyRLEnv(Base5ActionRLEnv): # close long if action == Actions.Long_exit.value and self._position == Positions.Long: - return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio) + return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) # close short if action == Actions.Short_exit.value and self._position == Positions.Short: - return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio) + return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio) return 0.0 @@ -1832,21 +1855,18 @@ class MyRLEnv(Base5ActionRLEnv): logger.warning("history is empty") return DataFrame() - _history_df = DataFrame.from_dict(self.history) + _history_df = DataFrame(self.history) if "tick" not in _history_df.columns: logger.warning("'tick' column is missing from history") return DataFrame() + _rollout_history = _history_df.copy() if self.trade_history: - _trade_history_df = DataFrame.from_dict(self.trade_history) + _trade_history_df = DataFrame(self.trade_history) if "tick" in _trade_history_df.columns: _rollout_history = merge( - _history_df, _trade_history_df, on="tick", how="left" + _rollout_history, _trade_history_df, on="tick", how="left" ) - else: - _rollout_history = _history_df.copy() - else: - _rollout_history = _history_df.copy() try: history = merge( @@ -1898,7 +1918,7 @@ class MyRLEnv(Base5ActionRLEnv): if len(history) == 0: return fig - plot_window = int(self.rl_config.get("plot_window", 2000)) + plot_window = self.rl_config.get("plot_window", 2000) if plot_window > 0 and len(history) > plot_window: history = history.iloc[-plot_window:] @@ -2091,6 +2111,14 @@ class InfoMetricsCallback(TensorboardCallback): "gamma": float(self.model.gamma), "batch_size": int(self.model.batch_size), } + try: + n_updates = getattr(self.model, "n_updates", None) + if n_updates is None: + n_updates = getattr(self.model, "_n_updates", None) + if isinstance(n_updates, (int, float)) and np.isfinite(n_updates): + hparam_dict.update({"n_updates": int(n_updates)}) + except Exception: + pass if "PPO" in self.model.__class__.__name__: cr = getattr(self.model, "clip_range", None) cr_schedule, cr_iv, cr_fv = get_schedule_type(cr) @@ -2174,6 +2202,8 @@ class InfoMetricsCallback(TensorboardCallback): if self.throttle > 1 and (self.num_timesteps % self.throttle) != 0: return True + logger_exclude = ("stdout", "log", "json", "csv") + def _is_numeric_non_bool(x: Any) -> bool: return isinstance( x, (int, float, np.integer, np.floating) @@ -2249,7 +2279,6 @@ class InfoMetricsCallback(TensorboardCallback): else: aggregated_info[f"{k}_mode"] = "mixed" - logger_exclude = ("stdout", "log", "json", "csv") self._safe_logger_record( "info/n_envs", int(len(infos_list)), exclude=logger_exclude ) @@ -2313,6 +2342,7 @@ class InfoMetricsCallback(TensorboardCallback): for k, counts in cat_counts.items(): cat_total = max(1, int(cat_totals.get(k, 0))) for name, cnt in counts.items(): + name = str(name) self._safe_logger_record( f"info/{k}/{name}_count", int(cnt), exclude=logger_exclude ) @@ -2359,6 +2389,17 @@ class InfoMetricsCallback(TensorboardCallback): except Exception: progress_remaining = 1.0 + try: + n_updates = getattr(self.model, "n_updates", None) + if n_updates is None: + n_updates = getattr(self.model, "_n_updates", None) + if _is_finite_number(n_updates): + self._safe_logger_record( + "train/n_updates", float(n_updates), exclude=logger_exclude + ) + except Exception: + pass + def _eval_schedule(schedule: Any) -> float | None: schedule_type, _, _ = get_schedule_type(schedule) try: @@ -2394,6 +2435,16 @@ class InfoMetricsCallback(TensorboardCallback): except Exception: pass + if "DQN" in self.model.__class__.__name__: + try: + er = getattr(self.model, "exploration_rate", None) + if _is_finite_number(er): + self._safe_logger_record( + "train/exploration_rate", float(er), exclude=logger_exclude + ) + except Exception: + pass + return True @@ -2496,23 +2547,31 @@ class MaskableTrialEvalCallback(MaskableEvalCallback): self.eval_idx, e, ) - if np.isfinite(best_mean_reward): - try: + try: + logger_exclude = ("stdout", "log", "json", "csv") + self.logger.record( + "eval/idx", + int(self.eval_idx), + exclude=logger_exclude, + ) + self.logger.record( + "eval/last_mean_reward", + last_mean_reward, + exclude=logger_exclude, + ) + if np.isfinite(best_mean_reward): self.logger.record( "eval/best_mean_reward", best_mean_reward, - exclude=("stdout", "log", "json", "csv"), + exclude=logger_exclude, ) - except Exception as e: - logger.error( - "Optuna: logger.record failed at %r: %r", - "eval/best_mean_reward", - e, + else: + logger.warning( + "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx ) - pass - else: - logger.warning( - "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx + except Exception as e: + logger.error( + "Optuna: logger.record failed at eval %s: %r", self.eval_idx, e ) try: if self.trial.should_prune(): @@ -2780,10 +2839,10 @@ def sample_params_ppo(trial: Trial, n_envs: int) -> Dict[str, Any]: """ n_steps = trial.suggest_categorical("n_steps", list(PPO_N_STEPS)) batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512, 1024]) - if batch_size > n_steps: - raise TrialPruned("batch_size is greater than n_steps") if (n_steps * n_envs) % batch_size != 0: - raise TrialPruned("n_steps * n_envs is not divisible by batch_size") + raise TrialPruned( + f"{n_steps=} * {n_envs=} = {n_steps * n_envs} is not divisible by {batch_size=}" + ) return convert_optuna_params_to_model_params( "PPO", { @@ -2842,8 +2901,8 @@ def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]: learning_starts = trial.suggest_categorical( "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000] ) - if learning_starts >= buffer_size: - raise TrialPruned("learning_starts is greater than or equal to buffer_size") + if learning_starts > buffer_size: + raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}") return { "train_freq": trial.suggest_categorical( "train_freq", [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] -- 2.43.0