test_df = data_dictionary.get("test_features")
env_dict = self.pack_env_dict(dk.pair)
seed = self.get_model_params().get("seed", 42)
- set_random_seed(seed)
if self.check_envs:
logger.info("Checking environments")
_eval_env_check.close()
logger.info("Populating environments: %s", self.n_envs)
- train_fns = [
- make_env(
- MyRLEnv,
- f"train_env{i}",
- i,
- seed,
- train_df,
- prices_train,
- env_info=env_dict,
- )
- for i in range(self.n_envs)
- ]
- eval_fns = [
- make_env(
- MyRLEnv,
- f"eval_env{i}",
- i,
- seed + 10_000,
- test_df,
- prices_test,
- env_info=env_dict,
- )
- for i in range(self.n_envs)
- ]
-
- logger.info("Multiprocessing: %s", self.multiprocessing)
- if self.multiprocessing and self.n_envs > 1:
- train_env = SubprocVecEnv(train_fns, start_method="spawn")
- eval_env = SubprocVecEnv(eval_fns, start_method="spawn")
- else:
- train_env = DummyVecEnv(train_fns)
- eval_env = DummyVecEnv(eval_fns)
-
- train_env = VecMonitor(train_env)
- eval_env = VecMonitor(eval_env)
-
- if self.frame_stacking:
- logger.info("Frame stacking: %s", self.frame_stacking)
- train_env = VecFrameStack(train_env, n_stack=self.frame_stacking)
- eval_env = VecFrameStack(eval_env, n_stack=self.frame_stacking)
-
- self.train_env = train_env
- self.eval_env = eval_env
+ self.train_env, self.eval_env = self._get_train_and_eval_environments(
+ train_df, test_df, dk, prices_train, prices_test, seed, env_dict
+ )
def get_model_params(self) -> Dict[str, Any]:
"""
return max(1, min(eval_freq, train_timesteps))
def get_callbacks(
- self, eval_freq: int, data_path: str, trial: Optional[Trial] = None
+ self,
+ eval_env: BaseEnvironment,
+ eval_freq: int,
+ data_path: str,
+ trial: Optional[Trial] = None,
) -> list[BaseCallback]:
"""
Get the model specific callbacks
self.progressbar_callback = ProgressBarCallback()
callbacks.append(self.progressbar_callback)
- use_masking = self.action_masking and is_masking_supported(self.eval_env)
+ use_masking = self.action_masking and is_masking_supported(eval_env)
if not trial:
self.eval_callback = MaskableEvalCallback(
- self.eval_env,
+ eval_env,
eval_freq=eval_freq,
deterministic=True,
render=False,
else:
trial_data_path = f"{data_path}/hyperopt/trial_{trial.number}"
self.optuna_eval_callback = MaskableTrialEvalCallback(
- self.eval_env,
+ eval_env,
trial,
eval_freq=eval_freq,
deterministic=True,
:return:
model Any = trained model to be used for inference in dry/live/backtesting
"""
- if self.train_env is None or self.eval_env is None:
- raise RuntimeError("Environments not set. Cannot run model training")
train_df = data_dictionary.get("train_features")
train_timesteps = len(train_df)
test_df = data_dictionary.get("test_features")
total_days,
)
logger.info("Test: %s steps (%s days)", test_timesteps, test_days)
+ logger.info("Multiprocessing: %s", self.multiprocessing)
+ logger.info("Frame stacking: %s", self.frame_stacking)
logger.info("Action masking: %s", self.action_masking)
logger.info("Hyperopt: %s", self.hyperopt)
start_time = time.time()
if self.hyperopt:
- best_trial_params = self.study(train_df, total_timesteps, dk)
+ best_trial_params = self.study(train_df, test_df, total_timesteps, dk)
if best_trial_params is None:
logger.error(
"Hyperopt failed. Using default configured model params instead"
)
eval_freq = self.get_eval_freq(train_timesteps, model_params)
- callbacks = self.get_callbacks(eval_freq, str(dk.data_path))
+ callbacks = self.get_callbacks(eval_freq, self.eval_env, str(dk.data_path))
try:
model.learn(total_timesteps=total_timesteps, callback=callbacks)
+ except KeyboardInterrupt:
+ pass
finally:
if self.progressbar_callback:
self.progressbar_callback.on_training_end()
return False
def study(
- self, train_df: DataFrame, total_timesteps: int, dk: FreqaiDataKitchen
+ self,
+ train_df: DataFrame,
+ test_df: DataFrame,
+ total_timesteps: int,
+ dk: FreqaiDataKitchen,
) -> Optional[Dict[str, Any]]:
"""
Runs hyperparameter optimization using Optuna and returns the best hyperparameters found merged with the user defined parameters
start_time = time.time()
try:
study.optimize(
- lambda trial: self.objective(trial, train_df, total_timesteps, dk),
+ lambda trial: self.objective(
+ trial, train_df, test_df, total_timesteps, dk
+ ),
n_trials=self.optuna_n_trials,
timeout=(
hours_to_seconds(self.optuna_timeout_hours)
return best_trial_params
return None
+ def _get_train_and_eval_environments(
+ self,
+ train_df: DataFrame,
+ test_df: DataFrame,
+ dk: FreqaiDataKitchen,
+ prices_train: Optional[DataFrame] = None,
+ prices_test: Optional[DataFrame] = None,
+ seed: Optional[int] = None,
+ env_info: Optional[Dict[str, Any]] = None,
+ trial: Optional[Trial] = None,
+ ) -> Tuple[BaseEnvironment, BaseEnvironment]:
+ if prices_train is None or prices_test is None:
+ prices_train, prices_test = self.build_ohlc_price_dataframes(
+ dk.data_dictionary, dk.pair, dk
+ )
+ seed = (
+ (
+ self.get_model_params().get("seed", 42)
+ + (trial.number if trial is not None else 0)
+ )
+ if seed is None
+ else seed
+ )
+ set_random_seed(seed)
+ env_info = self.pack_env_dict(dk.pair) if env_info is None else env_info
+ env_prefix = f"trial_{trial.number}_" if trial is not None else ""
+
+ train_fns = [
+ make_env(
+ MyRLEnv,
+ f"{env_prefix}train_env{i}",
+ i,
+ seed,
+ train_df,
+ prices_train,
+ env_info=env_info,
+ )
+ for i in range(self.n_envs)
+ ]
+ eval_fns = [
+ make_env(
+ MyRLEnv,
+ f"{env_prefix}eval_env{i}",
+ i,
+ seed + 10_000,
+ test_df,
+ prices_test,
+ env_info=env_info,
+ )
+ for i in range(self.n_envs)
+ ]
+
+ if self.multiprocessing and self.n_envs > 1:
+ train_env = SubprocVecEnv(train_fns, start_method="spawn")
+ eval_env = SubprocVecEnv(eval_fns, start_method="spawn")
+ else:
+ train_env = DummyVecEnv(train_fns)
+ eval_env = DummyVecEnv(eval_fns)
+
+ train_env = VecMonitor(train_env)
+ eval_env = VecMonitor(eval_env)
+
+ if self.frame_stacking:
+ train_env = VecFrameStack(train_env, n_stack=self.frame_stacking)
+ eval_env = VecFrameStack(eval_env, n_stack=self.frame_stacking)
+
+ return train_env, eval_env
+
def objective(
self,
trial: Trial,
train_df: DataFrame,
+ test_df: DataFrame,
total_timesteps: int,
dk: FreqaiDataKitchen,
) -> float:
"""
Defines a single trial for hyperparameter optimization using Optuna
"""
- if self.train_env is None or self.eval_env is None:
- raise RuntimeError("Environments not set. Cannot run HPO trial")
if "PPO" in self.model_type:
params = sample_params_ppo(trial, self.n_envs)
if params.get("n_steps", 0) * self.n_envs > total_timesteps:
logger.info("------------ Hyperopt trial %d ------------", trial.number)
logger.info("Trial %s params: %s", trial.number, params)
+ train_env, eval_env = self._get_train_and_eval_environments(
+ train_df, test_df, dk, trial=trial
+ )
+
model = self.MODELCLASS(
self.policy_type,
- self.train_env,
+ train_env,
tensorboard_log=tensorboard_log_path,
**params,
)
eval_freq = self.get_eval_freq(len(train_df), params)
- callbacks = self.get_callbacks(eval_freq, str(dk.data_path), trial)
+ callbacks = self.get_callbacks(eval_freq, eval_env, str(dk.data_path), trial)
try:
model.learn(total_timesteps=total_timesteps, callback=callbacks)
except AssertionError:
finally:
if self.progressbar_callback:
self.progressbar_callback.on_training_end()
+ train_env.close()
+ eval_env.close()
+ if hasattr(model, "env") and model.env is not None:
+ model.env.close()
+ del model, train_env, eval_env
if nan_encountered:
raise TrialPruned("NaN encountered during training")
env_id: str,
rank: int,
seed: int,
- train_df: DataFrame,
+ df: DataFrame,
price: DataFrame,
env_info: Dict[str, Any],
) -> Callable[[], BaseEnvironment]:
:param env_id: (str) the environment ID
:param rank: (int) index of the subprocess
:param seed: (int) the initial seed for RNG
- :param train_df: (DataFrame) feature dataframe for the environment
+ :param df: (DataFrame) feature dataframe for the environment
:param price: (DataFrame) aligned price dataframe
:param env_info: (dict) all required arguments to instantiate the environment
:return:
"""
def _init() -> BaseEnvironment:
- return MyRLEnv(
- df=train_df, prices=price, id=env_id, seed=seed + rank, **env_info
- )
+ return MyRLEnv(df=df, prices=price, id=env_id, seed=seed + rank, **env_info)
return _init