]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(reforcexy): isolate and seed HPO envs
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 20 Sep 2025 19:24:08 +0000 (21:24 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 20 Sep 2025 19:24:08 +0000 (21:24 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/user_data/freqaimodels/ReforceXY.py

index e3eb08835101763694b262016d5869df8953bf84..d98708b8237bd2972790af2c61974ff9c95f47c5 100644 (file)
@@ -248,7 +248,6 @@ class ReforceXY(BaseReinforcementLearningModel):
         test_df = data_dictionary.get("test_features")
         env_dict = self.pack_env_dict(dk.pair)
         seed = self.get_model_params().get("seed", 42)
-        set_random_seed(seed)
 
         if self.check_envs:
             logger.info("Checking environments")
@@ -276,49 +275,9 @@ class ReforceXY(BaseReinforcementLearningModel):
                 _eval_env_check.close()
 
         logger.info("Populating environments: %s", self.n_envs)
-        train_fns = [
-            make_env(
-                MyRLEnv,
-                f"train_env{i}",
-                i,
-                seed,
-                train_df,
-                prices_train,
-                env_info=env_dict,
-            )
-            for i in range(self.n_envs)
-        ]
-        eval_fns = [
-            make_env(
-                MyRLEnv,
-                f"eval_env{i}",
-                i,
-                seed + 10_000,
-                test_df,
-                prices_test,
-                env_info=env_dict,
-            )
-            for i in range(self.n_envs)
-        ]
-
-        logger.info("Multiprocessing: %s", self.multiprocessing)
-        if self.multiprocessing and self.n_envs > 1:
-            train_env = SubprocVecEnv(train_fns, start_method="spawn")
-            eval_env = SubprocVecEnv(eval_fns, start_method="spawn")
-        else:
-            train_env = DummyVecEnv(train_fns)
-            eval_env = DummyVecEnv(eval_fns)
-
-        train_env = VecMonitor(train_env)
-        eval_env = VecMonitor(eval_env)
-
-        if self.frame_stacking:
-            logger.info("Frame stacking: %s", self.frame_stacking)
-            train_env = VecFrameStack(train_env, n_stack=self.frame_stacking)
-            eval_env = VecFrameStack(eval_env, n_stack=self.frame_stacking)
-
-        self.train_env = train_env
-        self.eval_env = eval_env
+        self.train_env, self.eval_env = self._get_train_and_eval_environments(
+            train_df, test_df, dk, prices_train, prices_test, seed, env_dict
+        )
 
     def get_model_params(self) -> Dict[str, Any]:
         """
@@ -439,7 +398,11 @@ class ReforceXY(BaseReinforcementLearningModel):
         return max(1, min(eval_freq, train_timesteps))
 
     def get_callbacks(
-        self, eval_freq: int, data_path: str, trial: Optional[Trial] = None
+        self,
+        eval_env: BaseEnvironment,
+        eval_freq: int,
+        data_path: str,
+        trial: Optional[Trial] = None,
     ) -> list[BaseCallback]:
         """
         Get the model specific callbacks
@@ -470,10 +433,10 @@ class ReforceXY(BaseReinforcementLearningModel):
             self.progressbar_callback = ProgressBarCallback()
             callbacks.append(self.progressbar_callback)
 
-        use_masking = self.action_masking and is_masking_supported(self.eval_env)
+        use_masking = self.action_masking and is_masking_supported(eval_env)
         if not trial:
             self.eval_callback = MaskableEvalCallback(
-                self.eval_env,
+                eval_env,
                 eval_freq=eval_freq,
                 deterministic=True,
                 render=False,
@@ -487,7 +450,7 @@ class ReforceXY(BaseReinforcementLearningModel):
         else:
             trial_data_path = f"{data_path}/hyperopt/trial_{trial.number}"
             self.optuna_eval_callback = MaskableTrialEvalCallback(
-                self.eval_env,
+                eval_env,
                 trial,
                 eval_freq=eval_freq,
                 deterministic=True,
@@ -509,8 +472,6 @@ class ReforceXY(BaseReinforcementLearningModel):
         :return:
         model Any = trained model to be used for inference in dry/live/backtesting
         """
-        if self.train_env is None or self.eval_env is None:
-            raise RuntimeError("Environments not set. Cannot run model training")
         train_df = data_dictionary.get("train_features")
         train_timesteps = len(train_df)
         test_df = data_dictionary.get("test_features")
@@ -534,12 +495,14 @@ class ReforceXY(BaseReinforcementLearningModel):
             total_days,
         )
         logger.info("Test: %s steps (%s days)", test_timesteps, test_days)
+        logger.info("Multiprocessing: %s", self.multiprocessing)
+        logger.info("Frame stacking: %s", self.frame_stacking)
         logger.info("Action masking: %s", self.action_masking)
         logger.info("Hyperopt: %s", self.hyperopt)
 
         start_time = time.time()
         if self.hyperopt:
-            best_trial_params = self.study(train_df, total_timesteps, dk)
+            best_trial_params = self.study(train_df, test_df, total_timesteps, dk)
             if best_trial_params is None:
                 logger.error(
                     "Hyperopt failed. Using default configured model params instead"
@@ -572,9 +535,11 @@ class ReforceXY(BaseReinforcementLearningModel):
             )
 
         eval_freq = self.get_eval_freq(train_timesteps, model_params)
-        callbacks = self.get_callbacks(eval_freq, str(dk.data_path))
+        callbacks = self.get_callbacks(eval_freq, self.eval_env, str(dk.data_path))
         try:
             model.learn(total_timesteps=total_timesteps, callback=callbacks)
+        except KeyboardInterrupt:
+            pass
         finally:
             if self.progressbar_callback:
                 self.progressbar_callback.on_training_end()
@@ -720,7 +685,11 @@ class ReforceXY(BaseReinforcementLearningModel):
             return False
 
     def study(
-        self, train_df: DataFrame, total_timesteps: int, dk: FreqaiDataKitchen
+        self,
+        train_df: DataFrame,
+        test_df: DataFrame,
+        total_timesteps: int,
+        dk: FreqaiDataKitchen,
     ) -> Optional[Dict[str, Any]]:
         """
         Runs hyperparameter optimization using Optuna and returns the best hyperparameters found merged with the user defined parameters
@@ -767,7 +736,9 @@ class ReforceXY(BaseReinforcementLearningModel):
         start_time = time.time()
         try:
             study.optimize(
-                lambda trial: self.objective(trial, train_df, total_timesteps, dk),
+                lambda trial: self.objective(
+                    trial, train_df, test_df, total_timesteps, dk
+                ),
                 n_trials=self.optuna_n_trials,
                 timeout=(
                     hours_to_seconds(self.optuna_timeout_hours)
@@ -887,18 +858,85 @@ class ReforceXY(BaseReinforcementLearningModel):
             return best_trial_params
         return None
 
+    def _get_train_and_eval_environments(
+        self,
+        train_df: DataFrame,
+        test_df: DataFrame,
+        dk: FreqaiDataKitchen,
+        prices_train: Optional[DataFrame] = None,
+        prices_test: Optional[DataFrame] = None,
+        seed: Optional[int] = None,
+        env_info: Optional[Dict[str, Any]] = None,
+        trial: Optional[Trial] = None,
+    ) -> Tuple[BaseEnvironment, BaseEnvironment]:
+        if prices_train is None or prices_test is None:
+            prices_train, prices_test = self.build_ohlc_price_dataframes(
+                dk.data_dictionary, dk.pair, dk
+            )
+        seed = (
+            (
+                self.get_model_params().get("seed", 42)
+                + (trial.number if trial is not None else 0)
+            )
+            if seed is None
+            else seed
+        )
+        set_random_seed(seed)
+        env_info = self.pack_env_dict(dk.pair) if env_info is None else env_info
+        env_prefix = f"trial_{trial.number}_" if trial is not None else ""
+
+        train_fns = [
+            make_env(
+                MyRLEnv,
+                f"{env_prefix}train_env{i}",
+                i,
+                seed,
+                train_df,
+                prices_train,
+                env_info=env_info,
+            )
+            for i in range(self.n_envs)
+        ]
+        eval_fns = [
+            make_env(
+                MyRLEnv,
+                f"{env_prefix}eval_env{i}",
+                i,
+                seed + 10_000,
+                test_df,
+                prices_test,
+                env_info=env_info,
+            )
+            for i in range(self.n_envs)
+        ]
+
+        if self.multiprocessing and self.n_envs > 1:
+            train_env = SubprocVecEnv(train_fns, start_method="spawn")
+            eval_env = SubprocVecEnv(eval_fns, start_method="spawn")
+        else:
+            train_env = DummyVecEnv(train_fns)
+            eval_env = DummyVecEnv(eval_fns)
+
+        train_env = VecMonitor(train_env)
+        eval_env = VecMonitor(eval_env)
+
+        if self.frame_stacking:
+            train_env = VecFrameStack(train_env, n_stack=self.frame_stacking)
+            eval_env = VecFrameStack(eval_env, n_stack=self.frame_stacking)
+
+        return train_env, eval_env
+
     def objective(
         self,
         trial: Trial,
         train_df: DataFrame,
+        test_df: DataFrame,
         total_timesteps: int,
         dk: FreqaiDataKitchen,
     ) -> float:
         """
         Defines a single trial for hyperparameter optimization using Optuna
         """
-        if self.train_env is None or self.eval_env is None:
-            raise RuntimeError("Environments not set. Cannot run HPO trial")
         if "PPO" in self.model_type:
             params = sample_params_ppo(trial, self.n_envs)
             if params.get("n_steps", 0) * self.n_envs > total_timesteps:
@@ -940,15 +978,19 @@ class ReforceXY(BaseReinforcementLearningModel):
         logger.info("------------ Hyperopt trial %d ------------", trial.number)
         logger.info("Trial %s params: %s", trial.number, params)
 
+        train_env, eval_env = self._get_train_and_eval_environments(
+            train_df, test_df, dk, trial=trial
+        )
+
         model = self.MODELCLASS(
             self.policy_type,
-            self.train_env,
+            train_env,
             tensorboard_log=tensorboard_log_path,
             **params,
         )
 
         eval_freq = self.get_eval_freq(len(train_df), params)
-        callbacks = self.get_callbacks(eval_freq, str(dk.data_path), trial)
+        callbacks = self.get_callbacks(eval_freq, eval_env, str(dk.data_path), trial)
         try:
             model.learn(total_timesteps=total_timesteps, callback=callbacks)
         except AssertionError:
@@ -972,6 +1014,11 @@ class ReforceXY(BaseReinforcementLearningModel):
         finally:
             if self.progressbar_callback:
                 self.progressbar_callback.on_training_end()
+            train_env.close()
+            eval_env.close()
+            if hasattr(model, "env") and model.env is not None:
+                model.env.close()
+            del model, train_env, eval_env
 
         if nan_encountered:
             raise TrialPruned("NaN encountered during training")
@@ -1002,7 +1049,7 @@ def make_env(
     env_id: str,
     rank: int,
     seed: int,
-    train_df: DataFrame,
+    df: DataFrame,
     price: DataFrame,
     env_info: Dict[str, Any],
 ) -> Callable[[], BaseEnvironment]:
@@ -1013,7 +1060,7 @@ def make_env(
     :param env_id: (str) the environment ID
     :param rank: (int) index of the subprocess
     :param seed: (int) the initial seed for RNG
-    :param train_df: (DataFrame) feature dataframe for the environment
+    :param df: (DataFrame) feature dataframe for the environment
     :param price: (DataFrame) aligned price dataframe
     :param env_info: (dict) all required arguments to instantiate the environment
     :return:
@@ -1021,9 +1068,7 @@ def make_env(
     """
 
     def _init() -> BaseEnvironment:
-        return MyRLEnv(
-            df=train_df, prices=price, id=env_id, seed=seed + rank, **env_info
-        )
+        return MyRLEnv(df=df, prices=price, id=env_id, seed=seed + rank, **env_info)
 
     return _init