From 0f50f91cabb856485ba642855bc2ad20a0c86bf6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 20 Sep 2025 21:24:08 +0200 Subject: [PATCH] perf(reforcexy): isolate and seed HPO envs MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ReforceXY/user_data/freqaimodels/ReforceXY.py | 171 +++++++++++------- 1 file changed, 108 insertions(+), 63 deletions(-) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index e3eb088..d98708b 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -248,7 +248,6 @@ class ReforceXY(BaseReinforcementLearningModel): test_df = data_dictionary.get("test_features") env_dict = self.pack_env_dict(dk.pair) seed = self.get_model_params().get("seed", 42) - set_random_seed(seed) if self.check_envs: logger.info("Checking environments") @@ -276,49 +275,9 @@ class ReforceXY(BaseReinforcementLearningModel): _eval_env_check.close() logger.info("Populating environments: %s", self.n_envs) - train_fns = [ - make_env( - MyRLEnv, - f"train_env{i}", - i, - seed, - train_df, - prices_train, - env_info=env_dict, - ) - for i in range(self.n_envs) - ] - eval_fns = [ - make_env( - MyRLEnv, - f"eval_env{i}", - i, - seed + 10_000, - test_df, - prices_test, - env_info=env_dict, - ) - for i in range(self.n_envs) - ] - - logger.info("Multiprocessing: %s", self.multiprocessing) - if self.multiprocessing and self.n_envs > 1: - train_env = SubprocVecEnv(train_fns, start_method="spawn") - eval_env = SubprocVecEnv(eval_fns, start_method="spawn") - else: - train_env = DummyVecEnv(train_fns) - eval_env = DummyVecEnv(eval_fns) - - train_env = VecMonitor(train_env) - eval_env = VecMonitor(eval_env) - - if self.frame_stacking: - logger.info("Frame stacking: %s", self.frame_stacking) - train_env = VecFrameStack(train_env, n_stack=self.frame_stacking) - eval_env = VecFrameStack(eval_env, n_stack=self.frame_stacking) - - self.train_env = train_env - self.eval_env = eval_env + self.train_env, self.eval_env = self._get_train_and_eval_environments( + train_df, test_df, dk, prices_train, prices_test, seed, env_dict + ) def get_model_params(self) -> Dict[str, Any]: """ @@ -439,7 +398,11 @@ class ReforceXY(BaseReinforcementLearningModel): return max(1, min(eval_freq, train_timesteps)) def get_callbacks( - self, eval_freq: int, data_path: str, trial: Optional[Trial] = None + self, + eval_env: BaseEnvironment, + eval_freq: int, + data_path: str, + trial: Optional[Trial] = None, ) -> list[BaseCallback]: """ Get the model specific callbacks @@ -470,10 +433,10 @@ class ReforceXY(BaseReinforcementLearningModel): self.progressbar_callback = ProgressBarCallback() callbacks.append(self.progressbar_callback) - use_masking = self.action_masking and is_masking_supported(self.eval_env) + use_masking = self.action_masking and is_masking_supported(eval_env) if not trial: self.eval_callback = MaskableEvalCallback( - self.eval_env, + eval_env, eval_freq=eval_freq, deterministic=True, render=False, @@ -487,7 +450,7 @@ class ReforceXY(BaseReinforcementLearningModel): else: trial_data_path = f"{data_path}/hyperopt/trial_{trial.number}" self.optuna_eval_callback = MaskableTrialEvalCallback( - self.eval_env, + eval_env, trial, eval_freq=eval_freq, deterministic=True, @@ -509,8 +472,6 @@ class ReforceXY(BaseReinforcementLearningModel): :return: model Any = trained model to be used for inference in dry/live/backtesting """ - if self.train_env is None or self.eval_env is None: - raise RuntimeError("Environments not set. Cannot run model training") train_df = data_dictionary.get("train_features") train_timesteps = len(train_df) test_df = data_dictionary.get("test_features") @@ -534,12 +495,14 @@ class ReforceXY(BaseReinforcementLearningModel): total_days, ) logger.info("Test: %s steps (%s days)", test_timesteps, test_days) + logger.info("Multiprocessing: %s", self.multiprocessing) + logger.info("Frame stacking: %s", self.frame_stacking) logger.info("Action masking: %s", self.action_masking) logger.info("Hyperopt: %s", self.hyperopt) start_time = time.time() if self.hyperopt: - best_trial_params = self.study(train_df, total_timesteps, dk) + best_trial_params = self.study(train_df, test_df, total_timesteps, dk) if best_trial_params is None: logger.error( "Hyperopt failed. Using default configured model params instead" @@ -572,9 +535,11 @@ class ReforceXY(BaseReinforcementLearningModel): ) eval_freq = self.get_eval_freq(train_timesteps, model_params) - callbacks = self.get_callbacks(eval_freq, str(dk.data_path)) + callbacks = self.get_callbacks(eval_freq, self.eval_env, str(dk.data_path)) try: model.learn(total_timesteps=total_timesteps, callback=callbacks) + except KeyboardInterrupt: + pass finally: if self.progressbar_callback: self.progressbar_callback.on_training_end() @@ -720,7 +685,11 @@ class ReforceXY(BaseReinforcementLearningModel): return False def study( - self, train_df: DataFrame, total_timesteps: int, dk: FreqaiDataKitchen + self, + train_df: DataFrame, + test_df: DataFrame, + total_timesteps: int, + dk: FreqaiDataKitchen, ) -> Optional[Dict[str, Any]]: """ Runs hyperparameter optimization using Optuna and returns the best hyperparameters found merged with the user defined parameters @@ -767,7 +736,9 @@ class ReforceXY(BaseReinforcementLearningModel): start_time = time.time() try: study.optimize( - lambda trial: self.objective(trial, train_df, total_timesteps, dk), + lambda trial: self.objective( + trial, train_df, test_df, total_timesteps, dk + ), n_trials=self.optuna_n_trials, timeout=( hours_to_seconds(self.optuna_timeout_hours) @@ -887,18 +858,85 @@ class ReforceXY(BaseReinforcementLearningModel): return best_trial_params return None + def _get_train_and_eval_environments( + self, + train_df: DataFrame, + test_df: DataFrame, + dk: FreqaiDataKitchen, + prices_train: Optional[DataFrame] = None, + prices_test: Optional[DataFrame] = None, + seed: Optional[int] = None, + env_info: Optional[Dict[str, Any]] = None, + trial: Optional[Trial] = None, + ) -> Tuple[BaseEnvironment, BaseEnvironment]: + if prices_train is None or prices_test is None: + prices_train, prices_test = self.build_ohlc_price_dataframes( + dk.data_dictionary, dk.pair, dk + ) + seed = ( + ( + self.get_model_params().get("seed", 42) + + (trial.number if trial is not None else 0) + ) + if seed is None + else seed + ) + set_random_seed(seed) + env_info = self.pack_env_dict(dk.pair) if env_info is None else env_info + env_prefix = f"trial_{trial.number}_" if trial is not None else "" + + train_fns = [ + make_env( + MyRLEnv, + f"{env_prefix}train_env{i}", + i, + seed, + train_df, + prices_train, + env_info=env_info, + ) + for i in range(self.n_envs) + ] + eval_fns = [ + make_env( + MyRLEnv, + f"{env_prefix}eval_env{i}", + i, + seed + 10_000, + test_df, + prices_test, + env_info=env_info, + ) + for i in range(self.n_envs) + ] + + if self.multiprocessing and self.n_envs > 1: + train_env = SubprocVecEnv(train_fns, start_method="spawn") + eval_env = SubprocVecEnv(eval_fns, start_method="spawn") + else: + train_env = DummyVecEnv(train_fns) + eval_env = DummyVecEnv(eval_fns) + + train_env = VecMonitor(train_env) + eval_env = VecMonitor(eval_env) + + if self.frame_stacking: + train_env = VecFrameStack(train_env, n_stack=self.frame_stacking) + eval_env = VecFrameStack(eval_env, n_stack=self.frame_stacking) + + return train_env, eval_env + def objective( self, trial: Trial, train_df: DataFrame, + test_df: DataFrame, total_timesteps: int, dk: FreqaiDataKitchen, ) -> float: """ Defines a single trial for hyperparameter optimization using Optuna """ - if self.train_env is None or self.eval_env is None: - raise RuntimeError("Environments not set. Cannot run HPO trial") if "PPO" in self.model_type: params = sample_params_ppo(trial, self.n_envs) if params.get("n_steps", 0) * self.n_envs > total_timesteps: @@ -940,15 +978,19 @@ class ReforceXY(BaseReinforcementLearningModel): logger.info("------------ Hyperopt trial %d ------------", trial.number) logger.info("Trial %s params: %s", trial.number, params) + train_env, eval_env = self._get_train_and_eval_environments( + train_df, test_df, dk, trial=trial + ) + model = self.MODELCLASS( self.policy_type, - self.train_env, + train_env, tensorboard_log=tensorboard_log_path, **params, ) eval_freq = self.get_eval_freq(len(train_df), params) - callbacks = self.get_callbacks(eval_freq, str(dk.data_path), trial) + callbacks = self.get_callbacks(eval_freq, eval_env, str(dk.data_path), trial) try: model.learn(total_timesteps=total_timesteps, callback=callbacks) except AssertionError: @@ -972,6 +1014,11 @@ class ReforceXY(BaseReinforcementLearningModel): finally: if self.progressbar_callback: self.progressbar_callback.on_training_end() + train_env.close() + eval_env.close() + if hasattr(model, "env") and model.env is not None: + model.env.close() + del model, train_env, eval_env if nan_encountered: raise TrialPruned("NaN encountered during training") @@ -1002,7 +1049,7 @@ def make_env( env_id: str, rank: int, seed: int, - train_df: DataFrame, + df: DataFrame, price: DataFrame, env_info: Dict[str, Any], ) -> Callable[[], BaseEnvironment]: @@ -1013,7 +1060,7 @@ def make_env( :param env_id: (str) the environment ID :param rank: (int) index of the subprocess :param seed: (int) the initial seed for RNG - :param train_df: (DataFrame) feature dataframe for the environment + :param df: (DataFrame) feature dataframe for the environment :param price: (DataFrame) aligned price dataframe :param env_info: (dict) all required arguments to instantiate the environment :return: @@ -1021,9 +1068,7 @@ def make_env( """ def _init() -> BaseEnvironment: - return MyRLEnv( - df=train_df, prices=price, id=env_id, seed=seed + rank, **env_info - ) + return MyRLEnv(df=df, prices=price, id=env_id, seed=seed + rank, **env_info) return _init -- 2.43.0