From 28fc1db38f4fc890b9c10f8607a6559ec36d1a12 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 22 Dec 2025 00:52:44 +0100 Subject: [PATCH] fix(ReforceXY): eval frequency calculation and resource allocation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ReforceXY/user_data/freqaimodels/ReforceXY.py | 161 +++++++++++------- .../user_data/strategies/QuickAdapterV3.py | 2 +- 2 files changed, 103 insertions(+), 60 deletions(-) diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 183cb60..3086750 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -241,6 +241,9 @@ class ReforceXY(BaseReinforcementLearningModel): _STORAGE_BACKENDS: Final[Tuple[StorageBackend, ...]] = ("sqlite", "file") _SAMPLER_TYPES: Final[Tuple[SamplerType, ...]] = ("tpe", "auto") _PPO_N_STEPS: Final[Tuple[int, ...]] = (512, 1024, 2048, 4096) + _PPO_N_STEPS_MIN: Final[int] = min(_PPO_N_STEPS) + _PPO_N_STEPS_MAX: Final[int] = max(_PPO_N_STEPS) + _HYPEROPT_EVAL_FREQ_REDUCTION_FACTOR: Final[float] = 4.0 _action_masks_cache: ClassVar[Dict[Tuple[bool, float], NDArray[np.bool_]]] = {} @@ -659,56 +662,51 @@ class ReforceXY(BaseReinforcementLearningModel): self, total_timesteps: int, hyperopt: bool = False, - hyperopt_reduction_factor: float = 4.0, + hyperopt_reduction_factor: float = _HYPEROPT_EVAL_FREQ_REDUCTION_FACTOR, model_params: Optional[Dict[str, Any]] = None, ) -> int: - """ - Calculate evaluation frequency (number of steps between evaluations). - - For PPO: - - Use n_steps from model_params if available - - Otherwise, select the largest value from ReforceXY._PPO_N_STEPS that is <= total_timesteps - - For DQN: - - Use n_eval_steps divided by n_envs (rounded up) + """Calculate evaluation frequency. - For hyperopt: - - Reduce eval_freq by hyperopt_reduction_factor to speed up trials + Stable-Baselines3 triggers evaluation every `eval_freq` callback calls (`n_calls`). With + `n_envs > 1`, one callback call corresponds to `n_envs` timesteps, so this method works in + `n_calls` units. - Args: - total_timesteps: Total training timesteps - hyperopt: If True, reduce eval_freq for hyperopt - hyperopt_reduction_factor: Reduction factor for hyperopt (default: 4.0) - model_params: Model parameters (to get n_steps for PPO) + - PPO: prefer `n_steps`. + - DQN: use `ceil(n_eval_steps / n_envs)`. + - Hyperopt: optionally reduce interval. Returns: - int: Evaluation frequency (capped at total_timesteps) + Evaluation frequency in callback calls (`n_calls`). """ if total_timesteps <= 0: return 1 + + n_envs = self.n_envs + max_n_calls = max(1, total_timesteps // n_envs) + # "PPO" if ReforceXY._MODEL_TYPES[0] in self.model_type: eval_freq: Optional[int] = None if model_params: n_steps = model_params.get("n_steps") if isinstance(n_steps, int) and n_steps > 0: - eval_freq = max(1, n_steps) + eval_freq = max(1, min(n_steps, max_n_calls)) if eval_freq is None: eval_freq = next( ( step for step in sorted(ReforceXY._PPO_N_STEPS, reverse=True) - if step <= total_timesteps + if step <= max_n_calls ), - ReforceXY._PPO_N_STEPS[0], + max_n_calls, ) else: - eval_freq = max(1, (self.n_eval_steps + self.n_envs - 1) // self.n_envs) + eval_freq = max(1, (self.n_eval_steps + n_envs - 1) // n_envs) if hyperopt and hyperopt_reduction_factor > 1.0: eval_freq = max(1, int(round(eval_freq / hyperopt_reduction_factor))) - return min(eval_freq, total_timesteps) + return min(eval_freq, max_n_calls) def get_callbacks( self, @@ -794,9 +792,9 @@ class ReforceXY(BaseReinforcementLearningModel): test_df = data_dictionary.get("test_features") eval_timesteps = len(test_df) train_cycles = max(1, int(self.rl_config.get("train_cycles", 25))) - total_timesteps = ( - (train_timesteps * train_cycles + self.n_envs - 1) // self.n_envs - ) * self.n_envs + total_timesteps = ReforceXY._ceil_to_multiple( + train_timesteps * train_cycles, self.n_envs + ) train_days = steps_to_days(train_timesteps, self.config.get("timeframe")) eval_days = steps_to_days(eval_timesteps, self.config.get("timeframe")) total_days = steps_to_days(total_timesteps, self.config.get("timeframe")) @@ -851,9 +849,9 @@ class ReforceXY(BaseReinforcementLearningModel): ) if n_steps > 0: rollout = n_steps * self.n_envs - aligned_total_timesteps = ( - (total_timesteps + rollout - 1) // rollout - ) * rollout + aligned_total_timesteps = ReforceXY._ceil_to_multiple( + total_timesteps, rollout + ) if aligned_total_timesteps != total_timesteps: total_timesteps = aligned_total_timesteps logger.info( @@ -1153,6 +1151,27 @@ class ReforceXY(BaseReinforcementLearningModel): reduction_factor=reduction_factor, ) + @staticmethod + def _ceil_to_multiple(value: int, multiple: int) -> int: + return ((value + multiple - 1) // multiple) * multiple + + @staticmethod + def _ppo_resources(total_timesteps: int, n_envs: int) -> Tuple[int, int]: + min_n_steps = ReforceXY._PPO_N_STEPS_MIN + max_n_steps = ReforceXY._PPO_N_STEPS_MAX + min_resource = ( + max( + 1, + round(min_n_steps / ReforceXY._HYPEROPT_EVAL_FREQ_REDUCTION_FACTOR), + ) + * n_envs + ) + rollout = max_n_steps * n_envs + return ( + min_resource, + max(min_resource, ReforceXY._ceil_to_multiple(total_timesteps, rollout)), + ) + def optimize( self, dk: FreqaiDataKitchen, total_timesteps: int ) -> Optional[Dict[str, Any]]: @@ -1165,16 +1184,16 @@ class ReforceXY(BaseReinforcementLearningModel): continuous = self.rl_config_optuna.get("continuous", False) if continuous: ReforceXY.delete_study(study_name, storage) - # "PPO" - if ReforceXY._MODEL_TYPES[0] in self.model_type: - resource_eval_freq = min(ReforceXY._PPO_N_STEPS) - else: - resource_eval_freq = self.get_eval_freq(total_timesteps, hyperopt=True) + reduction_factor = 3 - max_resource = max( - reduction_factor * 2, (total_timesteps // self.n_envs) // resource_eval_freq - ) - min_resource = max(1, max_resource // reduction_factor) + n_envs = self.n_envs + if ReforceXY._MODEL_TYPES[0] in self.model_type: # "PPO" + min_resource, max_resource = ReforceXY._ppo_resources( + total_timesteps, n_envs + ) + else: + min_resource = self.get_eval_freq(total_timesteps, hyperopt=True) * n_envs + max_resource = max(min_resource, total_timesteps + (n_envs - 1)) study: Study = create_study( study_name=study_name, @@ -1435,9 +1454,9 @@ class ReforceXY(BaseReinforcementLearningModel): n_steps = params.get("n_steps", 0) if n_steps > 0: rollout = n_steps * self.n_envs - aligned_total_timesteps = ( - (total_timesteps + rollout - 1) // rollout - ) * rollout + aligned_total_timesteps = ReforceXY._ceil_to_multiple( + total_timesteps, rollout + ) if aligned_total_timesteps != total_timesteps: total_timesteps = aligned_total_timesteps @@ -2878,7 +2897,7 @@ class MyRLEnv(Base5ActionRLEnv): "duration_ratio": ( trade_duration / max(1, self.max_trade_duration_candles) ), - "trade_count": int(len(self.trade_history) // 2), + "trade_count": len(self.trade_history) // 2, } self._update_history(info) terminated = self.is_terminated() @@ -3252,7 +3271,7 @@ class MyRLEnv(Base5ActionRLEnv): fig.suptitle( f"Total Reward: {self.total_reward:.2f} ~ " + f"Total Profit: {self._total_profit:.2f} ~ " - + f"Trades: {int(len(self.trade_history) // 2)}", + + f"Trades: {len(self.trade_history) // 2}", ) fig.tight_layout() return fig @@ -3515,7 +3534,7 @@ class InfoMetricsCallback(TensorboardCallback): aggregated_info[f"{k}_mode"] = "mixed" self._safe_logger_record( - "info/n_envs", int(len(infos_list)), exclude=logger_exclude + "info/n_envs", len(infos_list), exclude=logger_exclude ) if filtered_values > 0: @@ -3747,52 +3766,66 @@ class MaskableTrialEvalCallback(MaskableEvalCallback): def _on_step(self) -> bool: if self.is_pruned: return False + _super_on_step = super()._on_step() if not _super_on_step: return False + if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0: self.eval_idx += 1 + try: last_mean_reward = float(getattr(self, "last_mean_reward", np.nan)) except Exception as e: logger.warning( - "Optuna: invalid last_mean_reward at eval %s: %r", self.eval_idx, e + "Optuna: invalid last_mean_reward (eval_idx=%s, timesteps=%s): %r", + self.eval_idx, + self.num_timesteps, + e, ) self.is_pruned = True return False + if not np.isfinite(last_mean_reward): logger.warning( - "Optuna: non-finite last_mean_reward at eval %s", self.eval_idx + "Optuna: non-finite last_mean_reward (eval_idx=%s, timesteps=%s)", + self.eval_idx, + self.num_timesteps, ) self.is_pruned = True return False + try: - self.trial.report(last_mean_reward, self.eval_idx) + self.trial.report(last_mean_reward, self.num_timesteps) except Exception as e: logger.warning( - "Optuna: trial.report failed at eval %s: %r", self.eval_idx, e + "Optuna: trial.report failed (eval_idx=%s, timesteps=%s): %r", + self.eval_idx, + self.num_timesteps, + e, ) self.is_pruned = True return False + try: best_mean_reward = float(getattr(self, "best_mean_reward", np.nan)) except Exception as e: logger.warning( - "Optuna: invalid best_mean_reward at eval %s: %r", + "Optuna: invalid best_mean_reward (eval_idx=%s, timesteps=%s): %r", self.eval_idx, + self.num_timesteps, e, ) + best_mean_reward = np.nan + try: logger_exclude = ("stdout", "log", "json", "csv") + self.logger.record("eval/idx", self.eval_idx, exclude=logger_exclude) self.logger.record( - "eval/idx", - int(self.eval_idx), - exclude=logger_exclude, + "eval/num_timesteps", self.num_timesteps, exclude=logger_exclude ) self.logger.record( - "eval/last_mean_reward", - last_mean_reward, - exclude=logger_exclude, + "eval/last_mean_reward", last_mean_reward, exclude=logger_exclude ) if np.isfinite(best_mean_reward): self.logger.record( @@ -3802,24 +3835,34 @@ class MaskableTrialEvalCallback(MaskableEvalCallback): ) else: logger.warning( - "Optuna: non-finite best_mean_reward at eval %s", self.eval_idx + "Optuna: non-finite best_mean_reward (eval_idx=%s, timesteps=%s)", + self.eval_idx, + self.num_timesteps, ) except Exception as e: logger.error( - "Optuna: logger.record failed at eval %s: %r", self.eval_idx, e + "Optuna: logger.record failed (eval_idx=%s, timesteps=%s): %r", + self.eval_idx, + self.num_timesteps, + e, ) + try: if self.trial.should_prune(): logger.info( - "Optuna: trial pruned at eval %s (score=%.5f)", + "Optuna: trial pruned (eval_idx=%s, timesteps=%s, score=%.5f)", self.eval_idx, + self.num_timesteps, last_mean_reward, ) self.is_pruned = True return False except Exception as e: logger.warning( - "Optuna: trial.should_prune failed at eval %s: %r", self.eval_idx, e + "Optuna: trial.should_prune failed (eval_idx=%s, timesteps=%s): %r", + self.eval_idx, + self.num_timesteps, + e, ) self.is_pruned = True return False diff --git a/quickadapter/user_data/strategies/QuickAdapterV3.py b/quickadapter/user_data/strategies/QuickAdapterV3.py index 52a7a83..8cd085d 100644 --- a/quickadapter/user_data/strategies/QuickAdapterV3.py +++ b/quickadapter/user_data/strategies/QuickAdapterV3.py @@ -144,7 +144,7 @@ class QuickAdapterV3(IStrategy): _CUSTOM_STOPLOSS_NATR_RATIO_PERCENT: Final[float] = 0.7860 - _ANNOTATION_LINE_OFFSET_CANDLES: Final[int] = 11 + _ANNOTATION_LINE_OFFSET_CANDLES: Final[int] = 10 timeframe_minutes = timeframe_to_minutes(timeframe) minimal_roi = {str(timeframe_minutes * 864): -1} -- 2.43.0