]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(reforcexy): optimize prediction code path
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 28 Sep 2025 14:00:21 +0000 (16:00 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 28 Sep 2025 14:00:21 +0000 (16:00 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/user_data/freqaimodels/ReforceXY.py

index a79f1e4e79965a915e0794aee7a53c15aa0d2041..b3c3a6e58165a105c5fc3d9a225cd8e8d245d49e 100644 (file)
@@ -287,6 +287,13 @@ class ReforceXY(BaseReinforcementLearningModel):
                 self.n_eval_episodes,
             )
             self.n_eval_episodes = 5
+        tensorboard_throttle = self.rl_config.get("tensorboard_throttle", 1)
+        if not isinstance(tensorboard_throttle, int) or tensorboard_throttle < 1:
+            logger.warning(
+                "Invalid tensorboard_throttle=%s. Forcing tensorboard_throttle=1",
+                tensorboard_throttle,
+            )
+            self.rl_config["tensorboard_throttle"] = 1
         if self.continual_learning and self.frame_stacking:
             logger.warning(
                 "User tried to use continual_learning with frame_stacking. \
@@ -456,7 +463,7 @@ class ReforceXY(BaseReinforcementLearningModel):
         if total_timesteps <= 0:
             return 1
         if "PPO" in self.model_type:
-            eval_freq = None
+            eval_freq: Optional[int] = None
             if model_params:
                 n_steps = model_params.get("n_steps")
                 if isinstance(n_steps, int) and n_steps > 0:
@@ -471,11 +478,10 @@ class ReforceXY(BaseReinforcementLearningModel):
                     PPO_N_STEPS[0],
                 )
         else:
-            if hyperopt and hyperopt_reduction_factor > 1.0:
-                eval_freq = int(self.n_eval_steps / hyperopt_reduction_factor)
-            else:
-                eval_freq = self.n_eval_steps
-            eval_freq = max(1, (eval_freq + self.n_envs - 1) // self.n_envs)
+            eval_freq = max(1, (self.n_eval_steps + self.n_envs - 1) // self.n_envs)
+
+        if hyperopt and hyperopt_reduction_factor > 1.0:
+            eval_freq = max(1, int(round(eval_freq / hyperopt_reduction_factor)))
 
         return min(eval_freq, total_timesteps)
 
@@ -581,9 +587,10 @@ class ReforceXY(BaseReinforcementLearningModel):
             total_days,
         )
         logger.info(
-            "Eval: %s steps (%s days), %s env(s)",
+            "Eval: %s steps (%s days), %s episodes, %s env(s)",
             eval_timesteps,
             eval_days,
+            self.n_eval_episodes,
             self.n_eval_envs,
         )
         logger.info("Multiprocessing: %s", self.multiprocessing)
@@ -694,6 +701,16 @@ class ReforceXY(BaseReinforcementLearningModel):
         """
 
         virtual_position: Positions = Positions.Neutral
+        np_dataframe: NDArray[np.float32] = dataframe.to_numpy(
+            dtype=np.float32, copy=False
+        )
+        n = int(np_dataframe.shape[0])
+        window_length = int(self.CONV_WIDTH)
+        if self.rl_config.get("add_state_info", False) and not self.live:
+            static_state_block = np.tile(
+                np.array([0.0, float(Positions.Neutral.value), 0.0], dtype=np.float32),
+                (window_length, 1),
+            )
 
         def _update_virtual_position(action: int, position: Positions) -> Positions:
             if action == Actions.Long_enter.value and position == Positions.Neutral:
@@ -707,36 +724,40 @@ class ReforceXY(BaseReinforcementLearningModel):
             return position
 
         frame_buffer: List[NDArray[np.float32]] = []
+        zero_frame: Optional[NDArray[np.float32]] = None
 
-        def _predict(window) -> int:
-            observation: DataFrame = dataframe.iloc[window.index]
+        def _predict(start_idx: int) -> int:
+            nonlocal zero_frame
+            end_idx = start_idx + window_length
+            np_observation = np_dataframe[start_idx:end_idx, :]
             action_masks_param: Dict[str, Any] = {}
 
             if self.rl_config.get("add_state_info", False):
                 if self.live:
                     position, pnl, trade_duration = self.get_state_info(dk.pair)
+                    position = ReforceXY._normalize_position(position)
+                    state_block = np.tile(
+                        np.array(
+                            [float(pnl), float(position.value), float(trade_duration)],
+                            dtype=np.float32,
+                        ),
+                        (np_observation.shape[0], 1),
+                    )
                 else:
-                    position = Positions.Neutral
-                    pnl = 0.0
-                    trade_duration = 0
-
-                # STATE_INFO
-                observation["pnl"] = pnl
-                observation["position"] = position
-                observation["trade_duration"] = trade_duration
-
-            np_observation = observation.to_numpy(dtype=np.float32)
+                    state_block = static_state_block
+                np_observation = np.concatenate([np_observation, state_block], axis=1)
 
             fb: List[NDArray[np.float32]] = frame_buffer
             frame_stacking = self.frame_stacking
             if frame_stacking and frame_stacking > 1:
-                fb.append(np_observation.copy())
+                fb.append(np_observation)
                 if len(fb) > frame_stacking:
                     del fb[0 : len(fb) - frame_stacking]
                 if len(fb) < frame_stacking:
                     pad_count = frame_stacking - len(fb)
-                    pad_frame = np.zeros_like(np_observation, dtype=np.float32)
-                    fb_padded = [pad_frame] * pad_count + fb
+                    if zero_frame is None:
+                        zero_frame = np.zeros_like(np_observation, dtype=np.float32)
+                    fb_padded = [zero_frame] * pad_count + fb
                 else:
                     fb_padded = fb
                 stacked_observations = np.concatenate(fb_padded, axis=1)
@@ -759,14 +780,13 @@ class ReforceXY(BaseReinforcementLearningModel):
             return int(action)
 
         predicted_actions: List[int] = []
-        for window_end in range(self.CONV_WIDTH, len(dataframe) + 1):
-            window = dataframe.iloc[window_end - self.CONV_WIDTH : window_end]
-            action = _predict(window)
+        for start_idx in range(0, n - window_length + 1):
+            action = _predict(start_idx)
             predicted_actions.append(action)
             virtual_position = _update_virtual_position(action, virtual_position)
 
-        pad = [np.nan] * (self.CONV_WIDTH - 1)
-        actions_list = pad + predicted_actions
+        pad_count = n - len(predicted_actions)
+        actions_list = ([np.nan] * pad_count) + predicted_actions
         actions = DataFrame({"action": actions_list}, index=dataframe.index)
 
         return DataFrame({label: actions["action"] for label in dk.label_list})
@@ -827,9 +847,9 @@ class ReforceXY(BaseReinforcementLearningModel):
             resource_eval_freq = self.get_eval_freq(total_timesteps, hyperopt=True)
         reduction_factor = 3
         max_resource = max(
-            reduction_factor * 2, total_timesteps // (resource_eval_freq * self.n_envs)
+            reduction_factor * 2, (total_timesteps // self.n_envs) // resource_eval_freq
         )
-        min_resource = min(reduction_factor, max_resource // reduction_factor)
+        min_resource = max(1, max_resource // reduction_factor)
         study: Study = create_study(
             study_name=study_name,
             sampler=TPESampler(
@@ -1053,8 +1073,11 @@ class ReforceXY(BaseReinforcementLearningModel):
 
         if "PPO" in self.model_type:
             params = sample_params_ppo(trial, self.n_envs)
-            if params.get("n_steps", 0) * self.n_envs > total_timesteps:
-                raise TrialPruned("n_steps * n_envs is greater than total_timesteps")
+            n_steps = params.get("n_steps", 0)
+            if n_steps * self.n_envs > total_timesteps:
+                raise TrialPruned(
+                    f"{n_steps=} * n_envs={self.n_envs}={n_steps * self.n_envs} is greater than {total_timesteps=}"
+                )
         elif "QRDQN" in self.model_type:
             params = sample_params_qrdqn(trial)
         elif "DQN" in self.model_type:
@@ -1065,12 +1088,12 @@ class ReforceXY(BaseReinforcementLearningModel):
         if "DQN" in self.model_type:
             gradient_steps = params.get("gradient_steps")
             if isinstance(gradient_steps, int) and gradient_steps <= 0:
-                raise TrialPruned("gradient_steps is negative or zero")
+                raise TrialPruned(f"{gradient_steps=} is negative or zero")
             batch_size = params.get("batch_size")
             buffer_size = params.get("buffer_size")
             if (batch_size * gradient_steps) > buffer_size:
                 raise TrialPruned(
-                    "batch_size * gradient_steps is greater than buffer_size"
+                    f"{batch_size=} * {gradient_steps=}={batch_size * gradient_steps} is greater than {buffer_size=}"
                 )
 
         # Ensure that the sampled parameters take precedence
@@ -1275,7 +1298,7 @@ class MyRLEnv(Base5ActionRLEnv):
         self._last_closed_trade_tick: int = 0
         return observation, history
 
-    def _get_exit_reward_factor(
+    def _get_exit_factor(
         self,
         factor: float,
         pnl: float,
@@ -1423,7 +1446,7 @@ class MyRLEnv(Base5ActionRLEnv):
             ForceActions.Stop_loss,
             ForceActions.Timeout,
         ):
-            return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio)
+            return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
 
         # # you can use feature values from dataframe
         # rsi_now = self.get_feature_value(
@@ -1507,11 +1530,11 @@ class MyRLEnv(Base5ActionRLEnv):
 
         # close long
         if action == Actions.Long_exit.value and self._position == Positions.Long:
-            return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio)
+            return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
 
         # close short
         if action == Actions.Short_exit.value and self._position == Positions.Short:
-            return pnl * self._get_exit_reward_factor(base_factor, pnl, duration_ratio)
+            return pnl * self._get_exit_factor(base_factor, pnl, duration_ratio)
 
         return 0.0
 
@@ -1832,21 +1855,18 @@ class MyRLEnv(Base5ActionRLEnv):
             logger.warning("history is empty")
             return DataFrame()
 
-        _history_df = DataFrame.from_dict(self.history)
+        _history_df = DataFrame(self.history)
         if "tick" not in _history_df.columns:
             logger.warning("'tick' column is missing from history")
             return DataFrame()
 
+        _rollout_history = _history_df.copy()
         if self.trade_history:
-            _trade_history_df = DataFrame.from_dict(self.trade_history)
+            _trade_history_df = DataFrame(self.trade_history)
             if "tick" in _trade_history_df.columns:
                 _rollout_history = merge(
-                    _history_df, _trade_history_df, on="tick", how="left"
+                    _rollout_history, _trade_history_df, on="tick", how="left"
                 )
-            else:
-                _rollout_history = _history_df.copy()
-        else:
-            _rollout_history = _history_df.copy()
 
         try:
             history = merge(
@@ -1898,7 +1918,7 @@ class MyRLEnv(Base5ActionRLEnv):
             if len(history) == 0:
                 return fig
 
-            plot_window = int(self.rl_config.get("plot_window", 2000))
+            plot_window = self.rl_config.get("plot_window", 2000)
             if plot_window > 0 and len(history) > plot_window:
                 history = history.iloc[-plot_window:]
 
@@ -2091,6 +2111,14 @@ class InfoMetricsCallback(TensorboardCallback):
             "gamma": float(self.model.gamma),
             "batch_size": int(self.model.batch_size),
         }
+        try:
+            n_updates = getattr(self.model, "n_updates", None)
+            if n_updates is None:
+                n_updates = getattr(self.model, "_n_updates", None)
+            if isinstance(n_updates, (int, float)) and np.isfinite(n_updates):
+                hparam_dict.update({"n_updates": int(n_updates)})
+        except Exception:
+            pass
         if "PPO" in self.model.__class__.__name__:
             cr = getattr(self.model, "clip_range", None)
             cr_schedule, cr_iv, cr_fv = get_schedule_type(cr)
@@ -2174,6 +2202,8 @@ class InfoMetricsCallback(TensorboardCallback):
         if self.throttle > 1 and (self.num_timesteps % self.throttle) != 0:
             return True
 
+        logger_exclude = ("stdout", "log", "json", "csv")
+
         def _is_numeric_non_bool(x: Any) -> bool:
             return isinstance(
                 x, (int, float, np.integer, np.floating)
@@ -2249,7 +2279,6 @@ class InfoMetricsCallback(TensorboardCallback):
                 else:
                     aggregated_info[f"{k}_mode"] = "mixed"
 
-            logger_exclude = ("stdout", "log", "json", "csv")
             self._safe_logger_record(
                 "info/n_envs", int(len(infos_list)), exclude=logger_exclude
             )
@@ -2313,6 +2342,7 @@ class InfoMetricsCallback(TensorboardCallback):
             for k, counts in cat_counts.items():
                 cat_total = max(1, int(cat_totals.get(k, 0)))
                 for name, cnt in counts.items():
+                    name = str(name)
                     self._safe_logger_record(
                         f"info/{k}/{name}_count", int(cnt), exclude=logger_exclude
                     )
@@ -2359,6 +2389,17 @@ class InfoMetricsCallback(TensorboardCallback):
         except Exception:
             progress_remaining = 1.0
 
+        try:
+            n_updates = getattr(self.model, "n_updates", None)
+            if n_updates is None:
+                n_updates = getattr(self.model, "_n_updates", None)
+            if _is_finite_number(n_updates):
+                self._safe_logger_record(
+                    "train/n_updates", float(n_updates), exclude=logger_exclude
+                )
+        except Exception:
+            pass
+
         def _eval_schedule(schedule: Any) -> float | None:
             schedule_type, _, _ = get_schedule_type(schedule)
             try:
@@ -2394,6 +2435,16 @@ class InfoMetricsCallback(TensorboardCallback):
             except Exception:
                 pass
 
+        if "DQN" in self.model.__class__.__name__:
+            try:
+                er = getattr(self.model, "exploration_rate", None)
+                if _is_finite_number(er):
+                    self._safe_logger_record(
+                        "train/exploration_rate", float(er), exclude=logger_exclude
+                    )
+            except Exception:
+                pass
+
         return True
 
 
@@ -2496,23 +2547,31 @@ class MaskableTrialEvalCallback(MaskableEvalCallback):
                     self.eval_idx,
                     e,
                 )
-            if np.isfinite(best_mean_reward):
-                try:
+            try:
+                logger_exclude = ("stdout", "log", "json", "csv")
+                self.logger.record(
+                    "eval/idx",
+                    int(self.eval_idx),
+                    exclude=logger_exclude,
+                )
+                self.logger.record(
+                    "eval/last_mean_reward",
+                    last_mean_reward,
+                    exclude=logger_exclude,
+                )
+                if np.isfinite(best_mean_reward):
                     self.logger.record(
                         "eval/best_mean_reward",
                         best_mean_reward,
-                        exclude=("stdout", "log", "json", "csv"),
+                        exclude=logger_exclude,
                     )
-                except Exception as e:
-                    logger.error(
-                        "Optuna: logger.record failed at %r: %r",
-                        "eval/best_mean_reward",
-                        e,
+                else:
+                    logger.warning(
+                        "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx
                     )
-                    pass
-            else:
-                logger.warning(
-                    "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx
+            except Exception as e:
+                logger.error(
+                    "Optuna: logger.record failed at eval %s: %r", self.eval_idx, e
                 )
             try:
                 if self.trial.should_prune():
@@ -2780,10 +2839,10 @@ def sample_params_ppo(trial: Trial, n_envs: int) -> Dict[str, Any]:
     """
     n_steps = trial.suggest_categorical("n_steps", list(PPO_N_STEPS))
     batch_size = trial.suggest_categorical("batch_size", [64, 128, 256, 512, 1024])
-    if batch_size > n_steps:
-        raise TrialPruned("batch_size is greater than n_steps")
     if (n_steps * n_envs) % batch_size != 0:
-        raise TrialPruned("n_steps * n_envs is not divisible by batch_size")
+        raise TrialPruned(
+            f"{n_steps=} * {n_envs=} = {n_steps * n_envs} is not divisible by {batch_size=}"
+        )
     return convert_optuna_params_to_model_params(
         "PPO",
         {
@@ -2842,8 +2901,8 @@ def get_common_dqn_optuna_params(trial: Trial) -> Dict[str, Any]:
     learning_starts = trial.suggest_categorical(
         "learning_starts", [500, 1000, 2000, 3000, 4000, 5000, 8000, 10000]
     )
-    if learning_starts >= buffer_size:
-        raise TrialPruned("learning_starts is greater than or equal to buffer_size")
+    if learning_starts > buffer_size:
+        raise TrialPruned(f"{learning_starts=} is greater than {buffer_size=}")
     return {
         "train_freq": trial.suggest_categorical(
             "train_freq", [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]