From 3f2697bc331ea6208c09a0e78f13e317212787c6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 12 Oct 2025 16:19:33 +0200 Subject: [PATCH] feat(reforcexy): add support for RecurrentPPO model MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 4 + ReforceXY/user_data/freqaimodels/ReforceXY.py | 245 +++++++++++------- .../freqaimodels/QuickAdapterRegressorV3.py | 2 +- 3 files changed, 159 insertions(+), 92 deletions(-) diff --git a/README.md b/README.md index c8837f4..a807d5e 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,10 @@ Then build and start the container: docker compose up -d --build ``` +### Supported models + +PPO, MaskablePPO, RecurrentPPO, DQN, QRDQN + ### Configuration tunables The documented list of model tunables is at the top of the [ReforceXY.py](./ReforceXY/user_data/freqaimodels/ReforceXY.py) file. diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 1388032..0252b4d 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -5,7 +5,7 @@ import logging import math import time import warnings -from collections import defaultdict +from collections import defaultdict, deque from collections.abc import Mapping from pathlib import Path from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, Union @@ -125,11 +125,10 @@ class ReforceXY(BaseReinforcementLearningModel): raise ValueError( "FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration" ) - self.action_masking: bool = ( - self.model_type == "MaskablePPO" - ) # Enable action masking + self.action_masking: bool = self.model_type == "MaskablePPO" self.rl_config.setdefault("action_masking", self.action_masking) self.inference_masking: bool = self.rl_config.get("inference_masking", True) + self.recurrent: bool = self.model_type == "RecurrentPPO" self.lr_schedule: bool = self.rl_config.get("lr_schedule", False) self.cr_schedule: bool = self.rl_config.get("cr_schedule", False) self.n_envs: int = self.rl_config.get("n_envs", 1) @@ -354,8 +353,7 @@ class ReforceXY(BaseReinforcementLearningModel): model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters) - if model_params.get("seed") is None: - model_params["seed"] = 42 + model_params.setdefault("seed", 42) if not self.hyperopt and self.lr_schedule: lr = model_params.get("learning_rate", 0.0003) @@ -438,8 +436,11 @@ class ReforceXY(BaseReinforcementLearningModel): model_params.get("policy_kwargs", {}).get("activation_fn", "relu") ) model_params["policy_kwargs"]["optimizer_class"] = get_optimizer_class( - model_params.get("policy_kwargs", {}).get("optimizer_class", "adam") + model_params.get("policy_kwargs", {}).get("optimizer_class", "adamw") ) + if "RecurrentPPO" in self.model_type: + model_params["policy_kwargs"].setdefault("lstm_hidden_size", 256) + model_params["policy_kwargs"].setdefault("n_lstm_layers", 1) self._model_params_cache = model_params return copy.deepcopy(self._model_params_cache) @@ -610,6 +611,7 @@ class ReforceXY(BaseReinforcementLearningModel): logger.info("Eval multiprocessing: %s", self.eval_multiprocessing) logger.info("Frame stacking: %s", self.frame_stacking) logger.info("Action masking: %s", self.action_masking) + logger.info("Recurrent: %s", self.recurrent) logger.info("Hyperopt: %s", self.hyperopt) start_time = time.time() @@ -725,9 +727,14 @@ class ReforceXY(BaseReinforcementLearningModel): n = np_dataframe.shape[0] window_size: int = self.CONV_WIDTH frame_stacking: int = self.frame_stacking - frame_stacking_activated: bool = bool(frame_stacking) and frame_stacking > 1 + frame_stacking_enabled: bool = bool(frame_stacking) and frame_stacking > 1 inference_masking: bool = self.action_masking and self.inference_masking + if window_size <= 0 or n < window_size: + return DataFrame( + {label: [np.nan] * n for label in dk.label_list}, index=dataframe.index + ) + def _update_virtual_position(action: int, position: Positions) -> Positions: if action == Actions.Long_enter.value and position == Positions.Neutral: return Positions.Long @@ -751,11 +758,13 @@ class ReforceXY(BaseReinforcementLearningModel): return current_virtual_trade_duration + 1 return 0 - frame_buffer: List[NDArray[np.float32]] = [] + frame_buffer = deque(maxlen=frame_stacking if frame_stacking_enabled else None) zero_frame: Optional[NDArray[np.float32]] = None + lstm_states: Optional[Tuple[NDArray[np.float32], NDArray[np.float32]]] = None + episode_start = np.array([True], dtype=bool) def _predict(start_idx: int) -> int: - nonlocal zero_frame + nonlocal zero_frame, lstm_states, episode_start end_idx: int = start_idx + window_size np_observation = np_dataframe[start_idx:end_idx, :] action_masks_param: Dict[str, Any] = {} @@ -771,6 +780,7 @@ class ReforceXY(BaseReinforcementLearningModel): ), (np_observation.shape[0], 1), ) + action_mask_position = position else: state_block = np.tile( np.array( @@ -783,20 +793,20 @@ class ReforceXY(BaseReinforcementLearningModel): ), (np_observation.shape[0], 1), ) + action_mask_position = virtual_position np_observation = np.concatenate([np_observation, state_block], axis=1) + else: + action_mask_position = virtual_position - fb: List[NDArray[np.float32]] = frame_buffer - if frame_stacking_activated: - fb.append(np_observation) - if len(fb) > frame_stacking: - del fb[0 : len(fb) - frame_stacking] - if len(fb) < frame_stacking: - pad_count = frame_stacking - len(fb) + if frame_stacking_enabled: + frame_buffer.append(np_observation) + if len(frame_buffer) < frame_stacking: + pad_count = frame_stacking - len(frame_buffer) if zero_frame is None: zero_frame = np.zeros_like(np_observation, dtype=np.float32) - fb_padded = [zero_frame] * pad_count + fb + fb_padded = [zero_frame] * pad_count + list(frame_buffer) else: - fb_padded = fb + fb_padded = list(frame_buffer) stacked_observations = np.concatenate(fb_padded, axis=1) observations = stacked_observations.reshape( 1, stacked_observations.shape[0], stacked_observations.shape[1] @@ -808,12 +818,23 @@ class ReforceXY(BaseReinforcementLearningModel): if inference_masking: action_masks_param["action_masks"] = ReforceXY.get_action_masks( - self.can_short, virtual_position + self.can_short, action_mask_position + ) + + if self.recurrent: + action, lstm_states = model.predict( + observations, + state=lstm_states, + episode_start=episode_start, + deterministic=True, + **action_masks_param, + ) + episode_start[:] = False + else: + action, _ = model.predict( + observations, deterministic=True, **action_masks_param ) - action, _ = model.predict( - observations, deterministic=True, **action_masks_param - ) return int(action) predicted_actions: List[int] = [] @@ -830,9 +851,9 @@ class ReforceXY(BaseReinforcementLearningModel): pad_count = max(0, n - len(predicted_actions)) actions_list = ([np.nan] * pad_count) + predicted_actions - actions = DataFrame({"action": actions_list}, index=dataframe.index) + actions_df = DataFrame({"action": actions_list}, index=dataframe.index) - return DataFrame({label: actions["action"] for label in dk.label_list}) + return DataFrame({label: actions_df["action"] for label in dk.label_list}) @staticmethod def study_delete(study_name: str, storage: BaseStorage) -> None: @@ -1123,27 +1144,39 @@ class ReforceXY(BaseReinforcementLearningModel): return train_env, eval_env + def get_optuna_params(self, trial: Trial) -> Dict[str, Any]: + if "RecurrentPPO" in self.model_type: + return sample_params_recurrentppo(trial) + elif "PPO" in self.model_type: + return sample_params_ppo(trial) + elif "QRDQN" in self.model_type: + return sample_params_qrdqn(trial) + elif "DQN" in self.model_type: + return sample_params_dqn(trial) + else: + raise NotImplementedError(f"{self.model_type} not supported for hyperopt") + def objective( self, trial: Trial, dk: FreqaiDataKitchen, total_timesteps: int ) -> float: """ - Defines a single trial for hyperparameter optimization using Optuna + Objective function for Optuna trials hyperparameter optimization """ logger.info("------------ Hyperopt trial %d ------------", trial.number) + params = self.get_optuna_params(trial) + if "PPO" in self.model_type: - params = sample_params_ppo(trial, self.n_envs) - n_steps = params.get("n_steps", 0) + n_steps = params.get("n_steps") if n_steps * self.n_envs > total_timesteps: raise TrialPruned( f"{n_steps=} * n_envs={self.n_envs}={n_steps * self.n_envs} is greater than {total_timesteps=}" ) - elif "QRDQN" in self.model_type: - params = sample_params_qrdqn(trial) - elif "DQN" in self.model_type: - params = sample_params_dqn(trial) - else: - raise NotImplementedError + batch_size = params.get("batch_size") + if (n_steps * self.n_envs) % batch_size != 0: + raise TrialPruned( + f"{n_steps=} * {self.n_envs=} = {n_steps * self.n_envs} is not divisible by {batch_size=}" + ) if "DQN" in self.model_type: gradient_steps = params.get("gradient_steps") @@ -1155,6 +1188,9 @@ class ReforceXY(BaseReinforcementLearningModel): raise TrialPruned( f"{batch_size=} * {gradient_steps=}={batch_size * gradient_steps} is greater than {buffer_size=}" ) + learning_starts = params.get("learning_starts") + if learning_starts > buffer_size: + raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}") # Ensure that the sampled parameters take precedence params = deepmerge(self.get_model_params(), params) @@ -2142,7 +2178,7 @@ class InfoMetricsCallback(TensorboardCallback): train_freq: Optional[Union[TrainFreq, int, Tuple[int, ...], List[int]]], ) -> Optional[int]: train_freq_val: Optional[int] = None - if isinstance(train_freq, TrainFreq): + if isinstance(train_freq, TrainFreq) and hasattr(train_freq, "frequency"): if isinstance(train_freq.frequency, int): train_freq_val = train_freq.frequency elif isinstance(train_freq, (tuple, list)) and train_freq: @@ -2202,6 +2238,17 @@ class InfoMetricsCallback(TensorboardCallback): ) if getattr(self.model, "target_kl", None) is not None: hparam_dict["target_kl"] = float(self.model.target_kl) + if "RecurrentPPO" in self.model.__class__.__name__: + policy = getattr(self.model, "policy", None) + if policy is not None: + lstm_actor = getattr(policy, "lstm_actor", None) + if lstm_actor is not None: + hparam_dict.update( + { + "lstm_hidden_size": int(lstm_actor.hidden_size), + "n_lstm_layers": int(lstm_actor.num_layers), + } + ) if "DQN" in self.model.__class__.__name__: hparam_dict.update( { @@ -2269,13 +2316,13 @@ class InfoMetricsCallback(TensorboardCallback): logger_exclude = ("stdout", "log", "json", "csv") - def _is_numeric_non_bool(x: Any) -> bool: + def _is_number(x: Any) -> bool: return isinstance( x, (int, float, np.integer, np.floating) ) and not isinstance(x, bool) def _is_finite_number(x: Any) -> bool: - if not _is_numeric_non_bool(x): + if not _is_number(x): return False try: return np.isfinite(float(x)) @@ -2300,7 +2347,7 @@ class InfoMetricsCallback(TensorboardCallback): continue if _is_finite_number(v): numeric_acc[k].append(float(v)) - elif _is_numeric_non_bool(v): + elif _is_number(v): filtered_values += 1 else: non_numeric_counts[k][v] += 1 @@ -2801,7 +2848,7 @@ def get_activation_fn( def get_optimizer_class( - optimizer_class_name: Literal["adam", "adamw"], + optimizer_class_name: Literal["adam", "adamw", "rmsprop"], ) -> Type[th.optim.Optimizer]: """ Get optimizer class @@ -2809,6 +2856,7 @@ def get_optimizer_class( return { "adam": th.optim.Adam, "adamw": th.optim.AdamW, + "rmsprop": th.optim.RMSprop, }.get(optimizer_class_name, th.optim.Adam) @@ -2857,6 +2905,11 @@ def convert_optuna_params_to_model_params( ) if optuna_params.get("target_kl") is not None: model_params["target_kl"] = float(optuna_params.get("target_kl")) + if "RecurrentPPO" in model_type: + policy_kwargs["lstm_hidden_size"] = int( + optuna_params.get("lstm_hidden_size") + ) + policy_kwargs["n_lstm_layers"] = int(optuna_params.get("n_lstm_layers")) elif "DQN" in model_type: required_dqn_params = [ "gamma", @@ -2927,50 +2980,63 @@ def convert_optuna_params_to_model_params( PPO_N_STEPS: Tuple[int, ...] = (512, 1024, 2048, 4096) -def sample_params_ppo(trial: Trial, n_envs: int) -> Dict[str, Any]: +def get_common_ppo_optuna_params(trial: Trial) -> Dict[str, Any]: + return { + "n_steps": trial.suggest_categorical("n_steps", list(PPO_N_STEPS)), + "batch_size": trial.suggest_categorical( + "batch_size", [64, 128, 256, 512, 1024] + ), + "gamma": trial.suggest_categorical( + "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999] + ), + "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True), + "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True), + "clip_range": trial.suggest_float("clip_range", 0.1, 0.4, step=0.05), + "n_epochs": trial.suggest_int("n_epochs", 1, 5), + "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.99, step=0.01), + "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05), + "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05), + "lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]), + "cr_schedule": trial.suggest_categorical("cr_schedule", ["linear", "constant"]), + "target_kl": trial.suggest_categorical( + "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04] + ), + "ortho_init": trial.suggest_categorical("ortho_init", [True, False]), + "net_arch": trial.suggest_categorical( + "net_arch", ["small", "medium", "large", "extra_large"] + ), + "activation_fn": trial.suggest_categorical( + "activation_fn", ["tanh", "relu", "elu", "leaky_relu"] + ), + "optimizer_class": trial.suggest_categorical( + "optimizer_class", ["adamw", "rmsprop"] + ), + } + + +def sample_params_ppo(trial: Trial) -> Dict[str, Any]: """ Sampler for PPO hyperparams """ - n_steps = trial.suggest_categorical("n_steps", list(PPO_N_STEPS)) - batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512, 1024]) - if (n_steps * n_envs) % batch_size != 0: - raise TrialPruned( - f"{n_steps=} * {n_envs=} = {n_steps * n_envs} is not divisible by {batch_size=}" - ) return convert_optuna_params_to_model_params( - "PPO", + "PPO", get_common_ppo_optuna_params(trial) + ) + + +def sample_params_recurrentppo(trial: Trial) -> Dict[str, Any]: + """ + Sampler for RecurrentPPO hyperparams + """ + ppo_optuna_params = get_common_ppo_optuna_params(trial) + ppo_optuna_params.update( { - "n_steps": n_steps, - "batch_size": batch_size, - "gamma": trial.suggest_categorical( - "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999] - ), - "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True), - "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True), - "clip_range": trial.suggest_float("clip_range", 0.1, 0.4, step=0.05), - "n_epochs": trial.suggest_categorical("n_epochs", [1, 2, 3, 4, 5]), - "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.99, step=0.01), - "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05), - "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05), - "lr_schedule": trial.suggest_categorical( - "lr_schedule", ["linear", "constant"] - ), - "cr_schedule": trial.suggest_categorical( - "cr_schedule", ["linear", "constant"] + "lstm_hidden_size": trial.suggest_categorical( + "lstm_hidden_size", [64, 128, 256, 512] ), - "target_kl": trial.suggest_categorical( - "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04] - ), - "ortho_init": trial.suggest_categorical("ortho_init", [False, True]), - "net_arch": trial.suggest_categorical( - "net_arch", ["small", "medium", "large", "extra_large"] - ), - "activation_fn": trial.suggest_categorical( - "activation_fn", ["tanh", "relu", "elu", "leaky_relu"] - ), - "optimizer_class": trial.suggest_categorical("optimizer_class", ["adamw"]), - }, + "n_lstm_layers": trial.suggest_int("n_lstm_layers", 1, 2), + } ) + return convert_optuna_params_to_model_params("RecurrentPPO", ppo_optuna_params) def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]: @@ -2986,17 +3052,6 @@ def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]: min_fraction = 0.15 else: min_fraction = 0.05 - exploration_fraction = trial.suggest_float( - "exploration_fraction", min_fraction, 0.9, step=0.02 - ) - buffer_size = trial.suggest_categorical( - "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)] - ) - learning_starts = trial.suggest_categorical( - "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000] - ) - if learning_starts > buffer_size: - raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}") return { "train_freq": trial.suggest_categorical( "train_freq", [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] @@ -3010,21 +3065,29 @@ def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]: ), "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True), "lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]), - "buffer_size": buffer_size, + "buffer_size": trial.suggest_categorical( + "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)] + ), "exploration_initial_eps": exploration_initial_eps, "exploration_final_eps": exploration_final_eps, - "exploration_fraction": exploration_fraction, + "exploration_fraction": trial.suggest_float( + "exploration_fraction", min_fraction, 0.9, step=0.02 + ), "target_update_interval": trial.suggest_categorical( "target_update_interval", [1000, 2000, 5000, 7500, 10000] ), - "learning_starts": learning_starts, + "learning_starts": trial.suggest_categorical( + "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000] + ), "net_arch": trial.suggest_categorical( "net_arch", ["small", "medium", "large", "extra_large"] ), "activation_fn": trial.suggest_categorical( "activation_fn", ["tanh", "relu", "elu", "leaky_relu"] ), - "optimizer_class": trial.suggest_categorical("optimizer_class", ["adam"]), + "optimizer_class": trial.suggest_categorical( + "optimizer_class", ["adamw", "rmsprop"] + ), } diff --git a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py index 1ca6ee0..f618c7b 100644 --- a/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py +++ b/quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py @@ -751,7 +751,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel): - For n_samples==1, returns [0.0]. - Raises ValueError if matrix is not 2D, has 0 features, contains non-finite values, or if weights are invalid or incompatible with the metric. - - Memory usage: O(n²/2) for the condensed distance vector (vs O(n²) for full matrix). + - Memory usage: O(n²/2) for the condensed distance vector. - Time complexity: O(n² × d) where d is the number of features. Example: -- 2.43.0