]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
feat(reforcexy): add support for RecurrentPPO model
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 12 Oct 2025 14:19:33 +0000 (16:19 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 12 Oct 2025 14:19:33 +0000 (16:19 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
README.md
ReforceXY/user_data/freqaimodels/ReforceXY.py
quickadapter/user_data/freqaimodels/QuickAdapterRegressorV3.py

index c8837f4f092834188441006a17277814ad686241..a807d5eec62b60c6948d664be527f1870f332074 100644 (file)
--- a/README.md
+++ b/README.md
@@ -100,6 +100,10 @@ Then build and start the container:
 docker compose up -d --build
 ```
 
+### Supported models
+
+PPO, MaskablePPO, RecurrentPPO, DQN, QRDQN
+
 ### Configuration tunables
 
 The documented list of model tunables is at the top of the [ReforceXY.py](./ReforceXY/user_data/freqaimodels/ReforceXY.py) file.
index 1388032393dc9437002bb35ef61a5362bd0c7b04..0252b4d18c521124eaa39bc139cefc37e0339f96 100644 (file)
@@ -5,7 +5,7 @@ import logging
 import math
 import time
 import warnings
-from collections import defaultdict
+from collections import defaultdict, deque
 from collections.abc import Mapping
 from pathlib import Path
 from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Type, Union
@@ -125,11 +125,10 @@ class ReforceXY(BaseReinforcementLearningModel):
             raise ValueError(
                 "FreqAI model requires StaticPairList method defined in pairlists configuration and pair_whitelist defined in exchange section configuration"
             )
-        self.action_masking: bool = (
-            self.model_type == "MaskablePPO"
-        )  # Enable action masking
+        self.action_masking: bool = self.model_type == "MaskablePPO"
         self.rl_config.setdefault("action_masking", self.action_masking)
         self.inference_masking: bool = self.rl_config.get("inference_masking", True)
+        self.recurrent: bool = self.model_type == "RecurrentPPO"
         self.lr_schedule: bool = self.rl_config.get("lr_schedule", False)
         self.cr_schedule: bool = self.rl_config.get("cr_schedule", False)
         self.n_envs: int = self.rl_config.get("n_envs", 1)
@@ -354,8 +353,7 @@ class ReforceXY(BaseReinforcementLearningModel):
 
         model_params: Dict[str, Any] = copy.deepcopy(self.model_training_parameters)
 
-        if model_params.get("seed") is None:
-            model_params["seed"] = 42
+        model_params.setdefault("seed", 42)
 
         if not self.hyperopt and self.lr_schedule:
             lr = model_params.get("learning_rate", 0.0003)
@@ -438,8 +436,11 @@ class ReforceXY(BaseReinforcementLearningModel):
             model_params.get("policy_kwargs", {}).get("activation_fn", "relu")
         )
         model_params["policy_kwargs"]["optimizer_class"] = get_optimizer_class(
-            model_params.get("policy_kwargs", {}).get("optimizer_class", "adam")
+            model_params.get("policy_kwargs", {}).get("optimizer_class", "adamw")
         )
+        if "RecurrentPPO" in self.model_type:
+            model_params["policy_kwargs"].setdefault("lstm_hidden_size", 256)
+            model_params["policy_kwargs"].setdefault("n_lstm_layers", 1)
 
         self._model_params_cache = model_params
         return copy.deepcopy(self._model_params_cache)
@@ -610,6 +611,7 @@ class ReforceXY(BaseReinforcementLearningModel):
         logger.info("Eval multiprocessing: %s", self.eval_multiprocessing)
         logger.info("Frame stacking: %s", self.frame_stacking)
         logger.info("Action masking: %s", self.action_masking)
+        logger.info("Recurrent: %s", self.recurrent)
         logger.info("Hyperopt: %s", self.hyperopt)
 
         start_time = time.time()
@@ -725,9 +727,14 @@ class ReforceXY(BaseReinforcementLearningModel):
         n = np_dataframe.shape[0]
         window_size: int = self.CONV_WIDTH
         frame_stacking: int = self.frame_stacking
-        frame_stacking_activated: bool = bool(frame_stacking) and frame_stacking > 1
+        frame_stacking_enabled: bool = bool(frame_stacking) and frame_stacking > 1
         inference_masking: bool = self.action_masking and self.inference_masking
 
+        if window_size <= 0 or n < window_size:
+            return DataFrame(
+                {label: [np.nan] * n for label in dk.label_list}, index=dataframe.index
+            )
+
         def _update_virtual_position(action: int, position: Positions) -> Positions:
             if action == Actions.Long_enter.value and position == Positions.Neutral:
                 return Positions.Long
@@ -751,11 +758,13 @@ class ReforceXY(BaseReinforcementLearningModel):
                     return current_virtual_trade_duration + 1
             return 0
 
-        frame_buffer: List[NDArray[np.float32]] = []
+        frame_buffer = deque(maxlen=frame_stacking if frame_stacking_enabled else None)
         zero_frame: Optional[NDArray[np.float32]] = None
+        lstm_states: Optional[Tuple[NDArray[np.float32], NDArray[np.float32]]] = None
+        episode_start = np.array([True], dtype=bool)
 
         def _predict(start_idx: int) -> int:
-            nonlocal zero_frame
+            nonlocal zero_frame, lstm_states, episode_start
             end_idx: int = start_idx + window_size
             np_observation = np_dataframe[start_idx:end_idx, :]
             action_masks_param: Dict[str, Any] = {}
@@ -771,6 +780,7 @@ class ReforceXY(BaseReinforcementLearningModel):
                         ),
                         (np_observation.shape[0], 1),
                     )
+                    action_mask_position = position
                 else:
                     state_block = np.tile(
                         np.array(
@@ -783,20 +793,20 @@ class ReforceXY(BaseReinforcementLearningModel):
                         ),
                         (np_observation.shape[0], 1),
                     )
+                    action_mask_position = virtual_position
                 np_observation = np.concatenate([np_observation, state_block], axis=1)
+            else:
+                action_mask_position = virtual_position
 
-            fb: List[NDArray[np.float32]] = frame_buffer
-            if frame_stacking_activated:
-                fb.append(np_observation)
-                if len(fb) > frame_stacking:
-                    del fb[0 : len(fb) - frame_stacking]
-                if len(fb) < frame_stacking:
-                    pad_count = frame_stacking - len(fb)
+            if frame_stacking_enabled:
+                frame_buffer.append(np_observation)
+                if len(frame_buffer) < frame_stacking:
+                    pad_count = frame_stacking - len(frame_buffer)
                     if zero_frame is None:
                         zero_frame = np.zeros_like(np_observation, dtype=np.float32)
-                    fb_padded = [zero_frame] * pad_count + fb
+                    fb_padded = [zero_frame] * pad_count + list(frame_buffer)
                 else:
-                    fb_padded = fb
+                    fb_padded = list(frame_buffer)
                 stacked_observations = np.concatenate(fb_padded, axis=1)
                 observations = stacked_observations.reshape(
                     1, stacked_observations.shape[0], stacked_observations.shape[1]
@@ -808,12 +818,23 @@ class ReforceXY(BaseReinforcementLearningModel):
 
             if inference_masking:
                 action_masks_param["action_masks"] = ReforceXY.get_action_masks(
-                    self.can_short, virtual_position
+                    self.can_short, action_mask_position
+                )
+
+            if self.recurrent:
+                action, lstm_states = model.predict(
+                    observations,
+                    state=lstm_states,
+                    episode_start=episode_start,
+                    deterministic=True,
+                    **action_masks_param,
+                )
+                episode_start[:] = False
+            else:
+                action, _ = model.predict(
+                    observations, deterministic=True, **action_masks_param
                 )
 
-            action, _ = model.predict(
-                observations, deterministic=True, **action_masks_param
-            )
             return int(action)
 
         predicted_actions: List[int] = []
@@ -830,9 +851,9 @@ class ReforceXY(BaseReinforcementLearningModel):
 
         pad_count = max(0, n - len(predicted_actions))
         actions_list = ([np.nan] * pad_count) + predicted_actions
-        actions = DataFrame({"action": actions_list}, index=dataframe.index)
+        actions_df = DataFrame({"action": actions_list}, index=dataframe.index)
 
-        return DataFrame({label: actions["action"] for label in dk.label_list})
+        return DataFrame({label: actions_df["action"] for label in dk.label_list})
 
     @staticmethod
     def study_delete(study_name: str, storage: BaseStorage) -> None:
@@ -1123,27 +1144,39 @@ class ReforceXY(BaseReinforcementLearningModel):
 
         return train_env, eval_env
 
+    def get_optuna_params(self, trial: Trial) -> Dict[str, Any]:
+        if "RecurrentPPO" in self.model_type:
+            return sample_params_recurrentppo(trial)
+        elif "PPO" in self.model_type:
+            return sample_params_ppo(trial)
+        elif "QRDQN" in self.model_type:
+            return sample_params_qrdqn(trial)
+        elif "DQN" in self.model_type:
+            return sample_params_dqn(trial)
+        else:
+            raise NotImplementedError(f"{self.model_type} not supported for hyperopt")
+
     def objective(
         self, trial: Trial, dk: FreqaiDataKitchen, total_timesteps: int
     ) -> float:
         """
-        Defines a single trial for hyperparameter optimization using Optuna
+        Objective function for Optuna trials hyperparameter optimization
         """
         logger.info("------------ Hyperopt trial %d ------------", trial.number)
 
+        params = self.get_optuna_params(trial)
+
         if "PPO" in self.model_type:
-            params = sample_params_ppo(trial, self.n_envs)
-            n_steps = params.get("n_steps", 0)
+            n_steps = params.get("n_steps")
             if n_steps * self.n_envs > total_timesteps:
                 raise TrialPruned(
                     f"{n_steps=} * n_envs={self.n_envs}={n_steps * self.n_envs} is greater than {total_timesteps=}"
                 )
-        elif "QRDQN" in self.model_type:
-            params = sample_params_qrdqn(trial)
-        elif "DQN" in self.model_type:
-            params = sample_params_dqn(trial)
-        else:
-            raise NotImplementedError
+            batch_size = params.get("batch_size")
+            if (n_steps * self.n_envs) % batch_size != 0:
+                raise TrialPruned(
+                    f"{n_steps=} * {self.n_envs=} = {n_steps * self.n_envs} is not divisible by {batch_size=}"
+                )
 
         if "DQN" in self.model_type:
             gradient_steps = params.get("gradient_steps")
@@ -1155,6 +1188,9 @@ class ReforceXY(BaseReinforcementLearningModel):
                 raise TrialPruned(
                     f"{batch_size=} * {gradient_steps=}={batch_size * gradient_steps} is greater than {buffer_size=}"
                 )
+            learning_starts = params.get("learning_starts")
+            if learning_starts > buffer_size:
+                raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}")
 
         # Ensure that the sampled parameters take precedence
         params = deepmerge(self.get_model_params(), params)
@@ -2142,7 +2178,7 @@ class InfoMetricsCallback(TensorboardCallback):
         train_freq: Optional[Union[TrainFreq, int, Tuple[int, ...], List[int]]],
     ) -> Optional[int]:
         train_freq_val: Optional[int] = None
-        if isinstance(train_freq, TrainFreq):
+        if isinstance(train_freq, TrainFreq) and hasattr(train_freq, "frequency"):
             if isinstance(train_freq.frequency, int):
                 train_freq_val = train_freq.frequency
         elif isinstance(train_freq, (tuple, list)) and train_freq:
@@ -2202,6 +2238,17 @@ class InfoMetricsCallback(TensorboardCallback):
             )
             if getattr(self.model, "target_kl", None) is not None:
                 hparam_dict["target_kl"] = float(self.model.target_kl)
+            if "RecurrentPPO" in self.model.__class__.__name__:
+                policy = getattr(self.model, "policy", None)
+                if policy is not None:
+                    lstm_actor = getattr(policy, "lstm_actor", None)
+                    if lstm_actor is not None:
+                        hparam_dict.update(
+                            {
+                                "lstm_hidden_size": int(lstm_actor.hidden_size),
+                                "n_lstm_layers": int(lstm_actor.num_layers),
+                            }
+                        )
         if "DQN" in self.model.__class__.__name__:
             hparam_dict.update(
                 {
@@ -2269,13 +2316,13 @@ class InfoMetricsCallback(TensorboardCallback):
 
         logger_exclude = ("stdout", "log", "json", "csv")
 
-        def _is_numeric_non_bool(x: Any) -> bool:
+        def _is_number(x: Any) -> bool:
             return isinstance(
                 x, (int, float, np.integer, np.floating)
             ) and not isinstance(x, bool)
 
         def _is_finite_number(x: Any) -> bool:
-            if not _is_numeric_non_bool(x):
+            if not _is_number(x):
                 return False
             try:
                 return np.isfinite(float(x))
@@ -2300,7 +2347,7 @@ class InfoMetricsCallback(TensorboardCallback):
                         continue
                     if _is_finite_number(v):
                         numeric_acc[k].append(float(v))
-                    elif _is_numeric_non_bool(v):
+                    elif _is_number(v):
                         filtered_values += 1
                     else:
                         non_numeric_counts[k][v] += 1
@@ -2801,7 +2848,7 @@ def get_activation_fn(
 
 
 def get_optimizer_class(
-    optimizer_class_name: Literal["adam", "adamw"],
+    optimizer_class_name: Literal["adam", "adamw", "rmsprop"],
 ) -> Type[th.optim.Optimizer]:
     """
     Get optimizer class
@@ -2809,6 +2856,7 @@ def get_optimizer_class(
     return {
         "adam": th.optim.Adam,
         "adamw": th.optim.AdamW,
+        "rmsprop": th.optim.RMSprop,
     }.get(optimizer_class_name, th.optim.Adam)
 
 
@@ -2857,6 +2905,11 @@ def convert_optuna_params_to_model_params(
         )
         if optuna_params.get("target_kl") is not None:
             model_params["target_kl"] = float(optuna_params.get("target_kl"))
+        if "RecurrentPPO" in model_type:
+            policy_kwargs["lstm_hidden_size"] = int(
+                optuna_params.get("lstm_hidden_size")
+            )
+            policy_kwargs["n_lstm_layers"] = int(optuna_params.get("n_lstm_layers"))
     elif "DQN" in model_type:
         required_dqn_params = [
             "gamma",
@@ -2927,50 +2980,63 @@ def convert_optuna_params_to_model_params(
 PPO_N_STEPS: Tuple[int, ...] = (512, 1024, 2048, 4096)
 
 
-def sample_params_ppo(trial: Trial, n_envs: int) -> Dict[str, Any]:
+def get_common_ppo_optuna_params(trial: Trial) -> Dict[str, Any]:
+    return {
+        "n_steps": trial.suggest_categorical("n_steps", list(PPO_N_STEPS)),
+        "batch_size": trial.suggest_categorical(
+            "batch_size", [64, 128, 256, 512, 1024]
+        ),
+        "gamma": trial.suggest_categorical(
+            "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
+        ),
+        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
+        "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True),
+        "clip_range": trial.suggest_float("clip_range", 0.1, 0.4, step=0.05),
+        "n_epochs": trial.suggest_int("n_epochs", 1, 5),
+        "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.99, step=0.01),
+        "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05),
+        "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05),
+        "lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]),
+        "cr_schedule": trial.suggest_categorical("cr_schedule", ["linear", "constant"]),
+        "target_kl": trial.suggest_categorical(
+            "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04]
+        ),
+        "ortho_init": trial.suggest_categorical("ortho_init", [True, False]),
+        "net_arch": trial.suggest_categorical(
+            "net_arch", ["small", "medium", "large", "extra_large"]
+        ),
+        "activation_fn": trial.suggest_categorical(
+            "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
+        ),
+        "optimizer_class": trial.suggest_categorical(
+            "optimizer_class", ["adamw", "rmsprop"]
+        ),
+    }
+
+
+def sample_params_ppo(trial: Trial) -> Dict[str, Any]:
     """
     Sampler for PPO hyperparams
     """
-    n_steps = trial.suggest_categorical("n_steps", list(PPO_N_STEPS))
-    batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512, 1024])
-    if (n_steps * n_envs) % batch_size != 0:
-        raise TrialPruned(
-            f"{n_steps=} * {n_envs=} = {n_steps * n_envs} is not divisible by {batch_size=}"
-        )
     return convert_optuna_params_to_model_params(
-        "PPO",
+        "PPO", get_common_ppo_optuna_params(trial)
+    )
+
+
+def sample_params_recurrentppo(trial: Trial) -> Dict[str, Any]:
+    """
+    Sampler for RecurrentPPO hyperparams
+    """
+    ppo_optuna_params = get_common_ppo_optuna_params(trial)
+    ppo_optuna_params.update(
         {
-            "n_steps": n_steps,
-            "batch_size": batch_size,
-            "gamma": trial.suggest_categorical(
-                "gamma", [0.93, 0.95, 0.97, 0.98, 0.99, 0.995, 0.997, 0.999, 0.9999]
-            ),
-            "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
-            "ent_coef": trial.suggest_float("ent_coef", 0.0005, 0.03, log=True),
-            "clip_range": trial.suggest_float("clip_range", 0.1, 0.4, step=0.05),
-            "n_epochs": trial.suggest_categorical("n_epochs", [1, 2, 3, 4, 5]),
-            "gae_lambda": trial.suggest_float("gae_lambda", 0.9, 0.99, step=0.01),
-            "max_grad_norm": trial.suggest_float("max_grad_norm", 0.3, 1.0, step=0.05),
-            "vf_coef": trial.suggest_float("vf_coef", 0.0, 1.0, step=0.05),
-            "lr_schedule": trial.suggest_categorical(
-                "lr_schedule", ["linear", "constant"]
-            ),
-            "cr_schedule": trial.suggest_categorical(
-                "cr_schedule", ["linear", "constant"]
+            "lstm_hidden_size": trial.suggest_categorical(
+                "lstm_hidden_size", [64, 128, 256, 512]
             ),
-            "target_kl": trial.suggest_categorical(
-                "target_kl", [None, 0.01, 0.015, 0.02, 0.03, 0.04]
-            ),
-            "ortho_init": trial.suggest_categorical("ortho_init", [False, True]),
-            "net_arch": trial.suggest_categorical(
-                "net_arch", ["small", "medium", "large", "extra_large"]
-            ),
-            "activation_fn": trial.suggest_categorical(
-                "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
-            ),
-            "optimizer_class": trial.suggest_categorical("optimizer_class", ["adamw"]),
-        },
+            "n_lstm_layers": trial.suggest_int("n_lstm_layers", 1, 2),
+        }
     )
+    return convert_optuna_params_to_model_params("RecurrentPPO", ppo_optuna_params)
 
 
 def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
@@ -2986,17 +3052,6 @@ def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
         min_fraction = 0.15
     else:
         min_fraction = 0.05
-    exploration_fraction = trial.suggest_float(
-        "exploration_fraction", min_fraction, 0.9, step=0.02
-    )
-    buffer_size = trial.suggest_categorical(
-        "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)]
-    )
-    learning_starts = trial.suggest_categorical(
-        "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000]
-    )
-    if learning_starts > buffer_size:
-        raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}")
     return {
         "train_freq": trial.suggest_categorical(
             "train_freq", [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]
@@ -3010,21 +3065,29 @@ def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
         ),
         "learning_rate": trial.suggest_float("learning_rate", 1e-5, 3e-3, log=True),
         "lr_schedule": trial.suggest_categorical("lr_schedule", ["linear", "constant"]),
-        "buffer_size": buffer_size,
+        "buffer_size": trial.suggest_categorical(
+            "buffer_size", [int(1e4), int(5e4), int(1e5), int(2e5)]
+        ),
         "exploration_initial_eps": exploration_initial_eps,
         "exploration_final_eps": exploration_final_eps,
-        "exploration_fraction": exploration_fraction,
+        "exploration_fraction": trial.suggest_float(
+            "exploration_fraction", min_fraction, 0.9, step=0.02
+        ),
         "target_update_interval": trial.suggest_categorical(
             "target_update_interval", [1000, 2000, 5000, 7500, 10000]
         ),
-        "learning_starts": learning_starts,
+        "learning_starts": trial.suggest_categorical(
+            "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000]
+        ),
         "net_arch": trial.suggest_categorical(
             "net_arch", ["small", "medium", "large", "extra_large"]
         ),
         "activation_fn": trial.suggest_categorical(
             "activation_fn", ["tanh", "relu", "elu", "leaky_relu"]
         ),
-        "optimizer_class": trial.suggest_categorical("optimizer_class", ["adam"]),
+        "optimizer_class": trial.suggest_categorical(
+            "optimizer_class", ["adamw", "rmsprop"]
+        ),
     }
 
 
index 1ca6ee0563bc25b8f180ae87695a10cb5e8cc458..f618c7b9cfdd8adc7beccd29da61adf97d88cfeb 100644 (file)
@@ -751,7 +751,7 @@ class QuickAdapterRegressorV3(BaseRegressionModel):
         - For n_samples==1, returns [0.0].
         - Raises ValueError if matrix is not 2D, has 0 features, contains non-finite values,
           or if weights are invalid or incompatible with the metric.
-        - Memory usage: O(n²/2) for the condensed distance vector (vs O(n²) for full matrix).
+        - Memory usage: O(n²/2) for the condensed distance vector.
         - Time complexity: O(n² × d) where d is the number of features.
 
         Example: