From: Jérôme Benoit Date: Fri, 26 Dec 2025 20:37:58 +0000 (+0100) Subject: fix(ReforceXY): remove PBRS reward duration ratio clamping X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=01fa6f89bdbce76c802edaea0bba8398828a5c32;p=freqai-strategies.git fix(ReforceXY): remove PBRS reward duration ratio clamping Signed-off-by: Jérôme Benoit --- diff --git a/ReforceXY/reward_space_analysis/README.md b/ReforceXY/reward_space_analysis/README.md index b830dfc..95c53ee 100644 --- a/ReforceXY/reward_space_analysis/README.md +++ b/ReforceXY/reward_space_analysis/README.md @@ -351,7 +351,7 @@ multiplier for loss-side holds: where: - `r_pnl = pnl / pnl_target` -- `r_dur = clamp(duration_ratio, 0, 1)` +- `r_dur = max(duration_ratio, 0)` - `scale = base_factor · hold_potential_ratio` - `g = hold_potential_gain` - `T_pnl`, `T_dur` = configured transforms diff --git a/ReforceXY/reward_space_analysis/pyproject.toml b/ReforceXY/reward_space_analysis/pyproject.toml index c99aa8f..7f77c7f 100644 --- a/ReforceXY/reward_space_analysis/pyproject.toml +++ b/ReforceXY/reward_space_analysis/pyproject.toml @@ -12,14 +12,13 @@ dependencies = [ "pandas", "scikit-learn", "scipy>=1.11", - "pytest", ] -[dependency-groups] +[project.optional-dependencies] dev = [ - "pytest>=6.0", - "ruff", - "pytest-cov>=7.0.0", + "pytest>=8.0", + "pytest-cov>=7.0", + "ruff>=0.8", ] [build-system] @@ -59,22 +58,37 @@ log_cli_level = "INFO" log_cli_format = "%(asctime)s [%(levelname)8s] %(name)s: %(message)s" log_cli_date_format = "%Y-%m-%d %H:%M:%S" -# Coverage configuration addopts = [ "--verbose", "--tb=short", "--strict-markers", "--color=yes", - "--cov=reward_space_analysis", - "--cov-config=pyproject.toml", - "--cov-fail-under=85" + "--cov", ] [tool.coverage.run] source = ["reward_space_analysis"] +branch = true +parallel = true +relative_files = true omit = [ - "tests/*", - "test_*.py", + "*/tests/*", + "**/test_*.py", + "**/__pycache__/*", +] + +[tool.coverage.report] +show_missing = true +skip_empty = true +fail_under = 85 +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if TYPE_CHECKING:", + "if __name__ == .__main__.:", + "@abstractmethod", ] [tool.ruff] @@ -82,5 +96,26 @@ line-length = 100 target-version = "py311" [tool.ruff.lint] -select = ["E", "F", "W", "I"] -ignore = ["E501"] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade + "SIM", # flake8-simplify + "TCH", # flake8-type-checking + "PTH", # flake8-use-pathlib + "RUF", # ruff-specific rules +] +ignore = [ + "E501", # line too long +] + +[tool.ruff.lint.isort] +known-first-party = ["reward_space_analysis"] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index dd476a9..3cc3b11 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -18,7 +18,7 @@ import random import warnings from enum import Enum, IntEnum from pathlib import Path -from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Literal import numpy as np import pandas as pd @@ -26,6 +26,9 @@ from scipy import stats from scipy.spatial.distance import jensenshannon from scipy.stats import entropy, probplot +if TYPE_CHECKING: + from collections.abc import Iterable + try: from sklearn.ensemble import RandomForestRegressor from sklearn.inspection import partial_dependence, permutation_importance @@ -73,15 +76,15 @@ DEFAULT_IDLE_DURATION_MULTIPLIER = 4 # When that diagnostic column is not available (e.g., reporting from partial datasets), # we fall back to the weaker heuristic |Σ shaping| < PBRS_INVARIANCE_TOL. PBRS_INVARIANCE_TOL: float = 1e-6 -# Default discount factor γ for potential-based reward shaping +# Default discount factor γ for potential-based reward shaping # noqa: RUF003 POTENTIAL_GAMMA_DEFAULT: float = 0.95 # Default risk/reward ratio (RR) RISK_REWARD_RATIO_DEFAULT: float = 2.0 # Supported attenuation modes -ATTENUATION_MODES: Tuple[str, ...] = ("sqrt", "linear", "power", "half_life") -ATTENUATION_MODES_WITH_LEGACY: Tuple[str, ...] = ("legacy",) + ATTENUATION_MODES +ATTENUATION_MODES: tuple[str, ...] = ("sqrt", "linear", "power", "half_life") +ATTENUATION_MODES_WITH_LEGACY: tuple[str, ...] = ("legacy", *ATTENUATION_MODES) # Internal numeric guards and behavior toggles INTERNAL_GUARDS: dict[str, float] = { @@ -116,10 +119,10 @@ ALLOWED_EXIT_POTENTIAL_MODES = { } # Supported trading modes -TRADING_MODES: Tuple[str, ...] = ("spot", "margin", "futures") +TRADING_MODES: tuple[str, ...] = ("spot", "margin", "futures") # Supported p-value adjustment methods -ADJUST_METHODS: Tuple[str, ...] = ("none", "benjamini_hochberg") +ADJUST_METHODS: tuple[str, ...] = ("none", "benjamini_hochberg") # Alias without underscore for convenience _ADJUST_METHODS_ALIASES: frozenset[str] = frozenset({"benjaminihochberg"}) @@ -154,7 +157,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "exit_factor_threshold": 1000.0, # === PBRS PARAMETERS === # Potential-based reward shaping core parameters - # Discount factor γ for potential term (0 ≤ γ ≤ 1) + # Discount factor γ for potential term (0 ≤ γ ≤ 1) # noqa: RUF003 "potential_gamma": POTENTIAL_GAMMA_DEFAULT, # Exit potential modes: canonical | non_canonical | progressive_release | spike_cancel | retain_previous "exit_potential_mode": "canonical", @@ -181,7 +184,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { "exit_additive_transform_duration": "tanh", } -DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { +DEFAULT_MODEL_REWARD_PARAMETERS_HELP: dict[str, str] = { "invalid_action": "Penalty for invalid actions", "base_factor": "Base reward scale", "idle_penalty_power": "Idle penalty exponent", @@ -203,9 +206,9 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { "check_invariants": "Enable runtime invariant checks", "exit_factor_threshold": "Warn if |exit_factor| exceeds", # PBRS parameters - "potential_gamma": "PBRS discount γ (0–1)", + "potential_gamma": "PBRS discount γ (0-1)", # noqa: RUF001 "exit_potential_mode": "Exit potential mode (canonical|non_canonical|progressive_release|spike_cancel|retain_previous)", - "exit_potential_decay": "Decay for progressive_release (0–1)", + "exit_potential_decay": "Decay for progressive_release (0-1)", "hold_potential_enabled": "Enable hold potential Φ", "hold_potential_ratio": "Hold potential ratio", "hold_potential_gain": "Hold potential gain", @@ -230,7 +233,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { # Parameter validation utilities # --------------------------------------------------------------------------- -_PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = { +_PARAMETER_BOUNDS: dict[str, dict[str, float]] = { # key: {min: ..., max: ...} (bounds are inclusive where it makes sense) "invalid_action": {"max": 0.0}, # penalty should be <= 0 "base_factor": {"min": 0.0}, @@ -261,8 +264,8 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = { "exit_additive_gain": {"min": 0.0}, } -RewardParamValue = Union[float, str, bool, None] -RewardParams = Dict[str, RewardParamValue] +RewardParamValue = float | str | bool | None +RewardParams = dict[str, RewardParamValue] class RewardDiagnosticsWarning(RuntimeWarning): @@ -316,7 +319,7 @@ def _to_bool(value: Any) -> bool: raise ValueError(f"Param: unrecognized boolean literal {value!r}") -def _get_bool_param(params: RewardParams, key: str, default: Optional[bool] = None) -> bool: +def _get_bool_param(params: RewardParams, key: str, default: bool | None = None) -> bool: """Extract boolean parameter with type safety. Args: @@ -363,7 +366,7 @@ def _resolve_additive_enablement( def _get_float_param( - params: RewardParams, key: str, default: Optional[RewardParamValue] = None + params: RewardParams, key: str, default: RewardParamValue | None = None ) -> float: """Extract float parameter with type safety and default fallback. @@ -409,7 +412,7 @@ def _clamp_float_to_bounds( key: str, value: float, *, - bounds: Optional[Dict[str, float]] = None, + bounds: dict[str, float] | None = None, strict: bool, ) -> tuple[float, list[str]]: """Clamp numeric `value` to bounds for `key`. @@ -452,9 +455,7 @@ def _clamp_float_to_bounds( return adjusted, reason_parts -def _get_int_param( - params: RewardParams, key: str, default: Optional[RewardParamValue] = None -) -> int: +def _get_int_param(params: RewardParams, key: str, default: RewardParamValue | None = None) -> int: """Extract integer parameter with robust coercion. Args: @@ -502,7 +503,7 @@ def _get_int_param( return int(default) if isinstance(default, (int, float)) else 0 -def _get_str_param(params: RewardParams, key: str, default: Optional[str] = None) -> str: +def _get_str_param(params: RewardParams, key: str, default: str | None = None) -> str: """Extract string parameter with type safety and default fallback. Args: @@ -547,7 +548,7 @@ def _fail_safely(reason: str) -> float: def get_max_idle_duration_candles( params: RewardParams, *, - max_trade_duration_candles: Optional[int] = None, + max_trade_duration_candles: int | None = None, ) -> int: mtd = ( int(max_trade_duration_candles) @@ -569,7 +570,7 @@ def get_max_idle_duration_candles( def validate_reward_parameters( params: RewardParams, strict: bool = True, -) -> Tuple[RewardParams, Dict[str, Dict[str, Any]]]: +) -> tuple[RewardParams, dict[str, dict[str, Any]]]: """Clamp parameters to bounds and coerce booleans and numeric overrides. Returns a sanitized copy plus adjustments mapping (param -> original/adjusted/reason). @@ -578,10 +579,10 @@ def validate_reward_parameters( - Numeric-bounded keys are coerced to float when provided as str/bool/None. * In strict mode: raise on non-numeric or out-of-bounds. * In relaxed mode: fallback to min bound or 0.0 with adjustment reason. - - Non‑finite numerics fall back to min bound or 0.0 (relaxed) or raise (strict). + - Non-finite numerics fall back to min bound or 0.0 (relaxed) or raise (strict). """ sanitized = dict(params) - adjustments: Dict[str, Dict[str, Any]] = {} + adjustments: dict[str, dict[str, Any]] = {} # Boolean parameter coercion _bool_keys = [ @@ -665,7 +666,7 @@ def validate_reward_parameters( if not np.isclose(adjusted, original_numeric): sanitized[key] = adjusted prev_reason = adjustments.get(key, {}).get("reason") - reason: List[str] = [] + reason: list[str] = [] if prev_reason: reason.append(prev_reason) reason.extend(reason_parts) @@ -781,7 +782,7 @@ class RewardBreakdown: next_potential: float = 0.0 # PBRS helpers base_reward: float = 0.0 - pbrs_delta: float = 0.0 # Δ(s,a,s') = γ·Φ(s') − Φ(s) + pbrs_delta: float = 0.0 # Δ(s,a,s') = γ·Φ(s') − Φ(s) # noqa: RUF003 invariance_correction: float = 0.0 @@ -876,7 +877,7 @@ def _compute_time_attenuation_coefficient( else: effective_dr = duration_ratio - kernel = kernels.get(exit_attenuation_mode, None) + kernel = kernels.get(exit_attenuation_mode) if kernel is None: _warn_unknown_mode( "exit_attenuation_mode", @@ -912,12 +913,12 @@ def _get_exit_factor( """ Compute exit reward factor by applying multiplicative coefficients to base_factor. - Formula: exit_factor = base_factor × time_attenuation_coefficient × pnl_target_coefficient × efficiency_coefficient + Formula: exit_factor = base_factor * time_attenuation_coefficient * pnl_target_coefficient * efficiency_coefficient Args: base_factor: Base reward value before coefficient adjustments pnl: Realized profit/loss - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) + pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) duration_ratio: Trade duration relative to target duration context: Trade context with unrealized profit/loss extremes params: Reward configuration parameters @@ -955,7 +956,7 @@ def _get_exit_factor( if exit_factor < 0.0 and pnl >= 0.0: exit_factor = 0.0 exit_factor_threshold = _get_float_param(params, "exit_factor_threshold") - if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold): + if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold): # noqa: SIM102 if abs(exit_factor) > exit_factor_threshold: warnings.warn( f"|exit_factor|={abs(exit_factor):.2f} > threshold={exit_factor_threshold:.2f}", @@ -982,7 +983,7 @@ def _compute_pnl_target_coefficient( Args: params: Reward configuration parameters pnl: Realized profit/loss - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) + pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) risk_reward_ratio: Risk/reward ratio for loss penalty calculation Returns: @@ -1134,14 +1135,14 @@ def _compute_exit_reward( Args: base_factor: Base reward value before coefficient adjustments - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) + pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) duration_ratio: Trade duration relative to target duration context: Trade context with PnL and unrealized profit/loss extremes params: Reward configuration parameters risk_reward_ratio: Risk/reward ratio (must match the value used to calculate pnl_target) Returns: - float: Exit reward (pnl × exit_factor) + float: Exit reward (pnl * exit_factor) """ exit_factor = _get_exit_factor( base_factor, context.pnl, pnl_target, duration_ratio, context, params, risk_reward_ratio @@ -1168,7 +1169,7 @@ def calculate_reward( short_allowed=short_allowed, ) - base_reward: Optional[float] = None + base_reward: float | None = None if not is_valid and not action_masking: breakdown.invalid_penalty = _get_float_param(params, "invalid_action") base_reward = breakdown.invalid_penalty @@ -1516,7 +1517,7 @@ def simulate_samples( ) max_trade_duration_cap = int(max_trade_duration_candles * max_duration_ratio) - samples: list[Dict[str, float]] = [] + samples: list[dict[str, float]] = [] prev_potential: float = 0.0 # Stateful trajectory variables @@ -1763,7 +1764,7 @@ def _validate_simulation_invariants(df: pd.DataFrame) -> None: ) -def _compute_summary_stats(df: pd.DataFrame) -> Dict[str, Any]: +def _compute_summary_stats(df: pd.DataFrame) -> dict[str, Any]: """Compute summary statistics without writing to file.""" action_summary = df.groupby("action")["reward"].agg(["count", "mean", "std", "min", "max"]) component_share = df[ @@ -1835,7 +1836,7 @@ def _binned_stats( return aggregated -def _compute_relationship_stats(df: pd.DataFrame) -> Dict[str, Any]: +def _compute_relationship_stats(df: pd.DataFrame) -> dict[str, Any]: """Return binned stats dict for idle, trade duration and pnl (uniform bins). Defensive against missing optional columns (e.g., reward_invalid when synthetic @@ -1897,7 +1898,7 @@ def _compute_representativity_stats( df: pd.DataFrame, profit_aim: float, risk_reward_ratio: float, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Compute representativity statistics for the reward space.""" pnl_target = float(profit_aim * risk_reward_ratio) total = len(df) @@ -1942,7 +1943,7 @@ def _perform_feature_analysis( skip_partial_dependence: bool = False, rf_n_jobs: int = 1, perm_n_jobs: int = 1, -) -> Tuple[pd.DataFrame, Dict[str, Any], Dict[str, pd.DataFrame], Optional[RandomForestRegressor]]: +) -> tuple[pd.DataFrame, dict[str, Any], dict[str, pd.DataFrame], RandomForestRegressor | None]: """Compute feature importances using RandomForestRegressor. Parameters @@ -2064,7 +2065,7 @@ def _perform_feature_analysis( n_test=0, ) - model: Optional[RandomForestRegressor] = RandomForestRegressor( + model: RandomForestRegressor | None = RandomForestRegressor( n_estimators=400, max_depth=None, random_state=seed, @@ -2119,7 +2120,7 @@ def _perform_feature_analysis( ) # Partial dependence (optional) - partial_deps: Dict[str, pd.DataFrame] = {} + partial_deps: dict[str, pd.DataFrame] = {} if model is not None and not skip_partial_dependence: for feature in [ f for f in ["trade_duration", "idle_duration", "pnl"] if f in X_test.columns @@ -2192,10 +2193,10 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr else: try: df = pd.DataFrame(list(candidate)) - except TypeError: + except TypeError as e: raise ValueError( f"Data: 'transitions' in '{path}' is not iterable (type {type(candidate)!r})" - ) + ) from e except Exception as e: raise ValueError( f"Data: could not build DataFrame from 'transitions' in '{path}': {e!r}" @@ -2214,10 +2215,10 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr else: try: all_transitions.extend(list(trans)) - except TypeError: + except TypeError as e: raise ValueError( f"Data: episode 'transitions' is not iterable in '{path}' (type {type(trans)!r})" - ) + ) from e else: skipped += 1 if skipped: @@ -2298,7 +2299,7 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr if enforce_columns: raise ValueError( f"Data: missing required columns {sorted(missing_required)}. " - f"Found: {sorted(list(df.columns))}" + f"Found: {sorted(df.columns)}" ) warnings.warn( f"Missing columns {sorted(missing_required)}; filled with NaN when loading (enforce_columns=False)", @@ -2329,7 +2330,7 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr def compute_distribution_shift_metrics( synthetic_df: pd.DataFrame, real_df: pd.DataFrame, -) -> Dict[str, float]: +) -> dict[str, float]: """Compute distribution shift metrics between synthetic and real samples. Returns KL divergence, JS distance, Wasserstein distance, and KS test @@ -2395,7 +2396,7 @@ def compute_distribution_shift_metrics( return metrics -def _validate_distribution_metrics(metrics: Dict[str, float]) -> None: +def _validate_distribution_metrics(metrics: dict[str, float]) -> None: """Validate mathematical bounds of distribution shift metrics.""" for key, value in metrics.items(): if not np.isfinite(value): @@ -2406,28 +2407,25 @@ def _validate_distribution_metrics(metrics: Dict[str, float]) -> None: raise AssertionError(f"KL divergence {key} must be >= 0, got {value:.6f}") # JS distance must be in [0, 1] - if "js_distance" in key: - if not (0 <= value <= 1): - raise AssertionError(f"JS distance {key} must be in [0,1], got {value:.6f}") + if "js_distance" in key and not (0 <= value <= 1): + raise AssertionError(f"JS distance {key} must be in [0,1], got {value:.6f}") # Wasserstein distance must be >= 0 if "wasserstein" in key and value < 0: raise AssertionError(f"Wasserstein distance {key} must be >= 0, got {value:.6f}") # KS statistic must be in [0, 1] - if "ks_statistic" in key: - if not (0 <= value <= 1): - raise AssertionError(f"KS statistic {key} must be in [0,1], got {value:.6f}") + if "ks_statistic" in key and not (0 <= value <= 1): + raise AssertionError(f"KS statistic {key} must be in [0,1], got {value:.6f}") # p-values must be in [0, 1] - if "pvalue" in key: - if not (0 <= value <= 1): - raise AssertionError(f"p-value {key} must be in [0,1], got {value:.6f}") + if "pvalue" in key and not (0 <= value <= 1): + raise AssertionError(f"p-value {key} must be in [0,1], got {value:.6f}") def statistical_hypothesis_tests( df: pd.DataFrame, *, adjust_method: str = ADJUST_METHODS[0], seed: int = 42 -) -> Dict[str, Any]: +) -> dict[str, Any]: """Statistical hypothesis tests (Spearman, Kruskal-Wallis, Mann-Whitney). Parameters @@ -2547,7 +2545,7 @@ def statistical_hypothesis_tests( adj_final = np.empty_like(adj_sorted) adj_final[order] = np.clip(adj_sorted, 0, 1) # Attach adjusted p-values and recompute significance - for (name, res), p_adj in zip(items, adj_final): + for (name, res), p_adj in zip(items, adj_final, strict=False): res["p_value_adj"] = float(p_adj) res["significant_adj"] = bool(p_adj < alpha) results[name] = res @@ -2558,7 +2556,7 @@ def statistical_hypothesis_tests( return results -def _validate_hypothesis_test_results(results: Dict[str, Any]) -> None: +def _validate_hypothesis_test_results(results: dict[str, Any]) -> None: """Validate statistical properties of hypothesis test results.""" for test_name, result in results.items(): # All p-values must be in [0, 1] or NaN (for cases like constant input) @@ -2616,13 +2614,13 @@ def _validate_hypothesis_test_results(results: Dict[str, Any]) -> None: def bootstrap_confidence_intervals( df: pd.DataFrame, - metrics: List[str], + metrics: list[str], n_bootstrap: int = 10000, confidence_level: float = 0.95, seed: int = 42, *, strict_diagnostics: bool = False, -) -> Dict[str, Tuple[float, float, float]]: +) -> dict[str, tuple[float, float, float]]: """Compute bootstrap confidence intervals for metric means. Returns percentile-based CIs, skipping metrics with <10 samples. @@ -2639,6 +2637,7 @@ def bootstrap_confidence_intervals( warnings.warn( f"n_bootstrap={n_bootstrap} < {min_rec}; confidence intervals may be unstable", RewardDiagnosticsWarning, + stacklevel=2, ) # Local RNG to avoid mutating global NumPy RNG state @@ -2686,7 +2685,7 @@ def bootstrap_confidence_intervals( def _validate_bootstrap_results( - results: Dict[str, Tuple[float, float, float]], *, strict_diagnostics: bool + results: dict[str, tuple[float, float, float]], *, strict_diagnostics: bool ) -> None: """Validate each bootstrap CI: finite bounds, ordered, positive width (adjust or raise).""" for metric, (mean, ci_low, ci_high) in results.items(): @@ -2710,10 +2709,7 @@ def _validate_bootstrap_results( if strict_diagnostics: raise AssertionError(f"Bootstrap CI for {metric}: non-positive width {width:.6f}") # Graceful mode: expand interval symmetrically - if width == 0: - epsilon = INTERNAL_GUARDS["degenerate_ci_epsilon"] - else: - epsilon = abs(width) * 1e-6 + epsilon = INTERNAL_GUARDS["degenerate_ci_epsilon"] if width == 0 else abs(width) * 1e-06 center = mean # Adjust only if current bounds are identical; otherwise enforce ordering minimally. if ci_low == ci_high: @@ -2728,6 +2724,7 @@ def _validate_bootstrap_results( warnings.warn( f"bootstrap_ci for '{metric}' degenerate (width={width:.6e}); adjusted with epsilon={epsilon:.1e}", RewardDiagnosticsWarning, + stacklevel=2, ) @@ -2736,7 +2733,7 @@ def distribution_diagnostics( *, seed: int | None = None, strict_diagnostics: bool = False, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Return mapping col-> diagnostics (tests, moments, entropy, divergences). Skips missing columns; selects Shapiro-Wilk when n<=5000 else K2; ignores non-finite intermediates. @@ -2763,7 +2760,7 @@ def distribution_diagnostics( msg = f"Extreme moment(s) for {col}: skew={skew_v:.3e}, kurtosis={kurt_v:.3e} exceeds threshold {thr}." if strict_diagnostics: raise AssertionError(msg) - warnings.warn(msg, RewardDiagnosticsWarning) + warnings.warn(msg, RewardDiagnosticsWarning, stacklevel=2) if len(data) < 5000: sw_stat, sw_pval = stats.shapiro(data) @@ -2785,7 +2782,7 @@ def distribution_diagnostics( return diagnostics -def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnostics: bool) -> None: +def _validate_distribution_diagnostics(diag: dict[str, Any], *, strict_diagnostics: bool) -> None: """Validate mathematical properties of distribution diagnostics. Ensures all reported statistics are finite and within theoretical bounds where applicable. @@ -2800,7 +2797,7 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti zero_var_columns.add(prefix) for key, value in list(diag.items()): - if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]): + if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]): # noqa: SIM102 if not np.isfinite(value): # Graceful degradation for constant distributions: skewness/kurtosis become NaN. constant_problem = any( @@ -2814,13 +2811,13 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti warnings.warn( f"{key} undefined (constant distribution); falling back to {fallback}", RewardDiagnosticsWarning, + stacklevel=2, ) else: raise AssertionError(f"Distribution diagnostic {key} is not finite: {value}") - if key.endswith("_shapiro_pval"): - if not (0 <= value <= 1): - raise AssertionError(f"Shapiro p-value {key} must be in [0,1], got {value}") - if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"): + if key.endswith("_shapiro_pval") and not (0 <= value <= 1): + raise AssertionError(f"Shapiro p-value {key} must be in [0,1], got {value}") + if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"): # noqa: SIM102 if not np.isfinite(value): prefix = key.rsplit("_", 2)[0] if prefix in zero_var_columns and not strict_diagnostics: @@ -2829,10 +2826,11 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti warnings.warn( f"{key} undefined (constant distribution); falling back to {fallback}", RewardDiagnosticsWarning, + stacklevel=2, ) continue raise AssertionError(f"Anderson statistic {key} must be finite, got {value}") - if key.endswith("_qq_r_squared"): + if key.endswith("_qq_r_squared"): # noqa: SIM102 if not (isinstance(value, (int, float)) and np.isfinite(value) and 0 <= value <= 1): prefix = key[: -len("_qq_r_squared")] if prefix in zero_var_columns and not strict_diagnostics: @@ -2841,6 +2839,7 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti warnings.warn( f"{key} undefined (constant distribution); falling back to {fallback_r2}", RewardDiagnosticsWarning, + stacklevel=2, ) else: raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}") @@ -2868,7 +2867,7 @@ def _apply_transform_arctan(value: float) -> float: def _apply_transform_sigmoid(value: float) -> float: - """sigmoid: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1).""" + """sigmoid: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1).""" # noqa: RUF002 x = value try: if x >= 0: @@ -3196,13 +3195,13 @@ def compute_pbrs_components( R'(s,a,s') = R(s,a,s') + Δ(s,a,s') where: - Δ(s,a,s') = γ·Φ(s') - Φ(s) (PBRS shaping term) + Δ(s,a,s') = gamma * Phi(s') - Phi(s) (PBRS shaping term) Hold Potential Formula ---------------------- Let: r_pnl = pnl / pnl_target - r_dur = clamp(duration_ratio, 0, 1) + r_dur = max(duration_ratio, 0) scale = base_factor · hold_potential_ratio g = gain T_pnl, T_dur = configured bounded transforms @@ -3345,7 +3344,7 @@ def _compute_pnl_duration_signal( non_finite_key: str, *, base_factor: float, - risk_reward_ratio: Optional[float] = None, + risk_reward_ratio: float | None = None, ) -> float: """Generic helper for (pnl, duration) bi-component transforms.""" if not (np.isfinite(pnl) and np.isfinite(pnl_target) and np.isfinite(duration_ratio)): @@ -3354,7 +3353,7 @@ def _compute_pnl_duration_signal( return _fail_safely(f"{kind}_invalid_pnl_target") pnl_ratio = float(pnl / pnl_target) - duration_ratio = float(np.clip(duration_ratio, 0.0, 1.0)) + duration_ratio = float(max(0.0, duration_ratio)) ratio = _get_float_param(params, scale_key) scale = ratio * base_factor @@ -3537,10 +3536,10 @@ def write_complete_statistical_analysis( profit_aim: float, risk_reward_ratio: float, seed: int, - real_df: Optional[pd.DataFrame] = None, + real_df: pd.DataFrame | None = None, *, adjust_method: str = ADJUST_METHODS[0], - stats_seed: Optional[int] = None, + stats_seed: int | None = None, strict_diagnostics: bool = False, bootstrap_resamples: int = 10000, skip_partial_dependence: bool = False, @@ -3590,7 +3589,7 @@ def write_complete_statistical_analysis( sep += "|" + "-" * (len(str(c)) + 2) sep += "|\n" # Rows - rows: List[str] = [] + rows: list[str] = [] for idx, row in df.iterrows(): vals = [_fmt_val(row[c], ndigits) for c in cols] rows.append("| " + str(idx) + " | " + " | ".join(vals) + " |") @@ -3720,7 +3719,7 @@ def write_complete_statistical_analysis( # Blank separator before overrides block f.write("| | |\n") - overrides_pairs: List[str] = [] + overrides_pairs: list[str] = [] if reward_params: for k, default_v in DEFAULT_MODEL_REWARD_PARAMETERS.items(): if k in ("exit_potential_mode", "potential_gamma"): @@ -3755,7 +3754,7 @@ def write_complete_statistical_analysis( f.write("### 1.3 Component Activation Rates\n\n") f.write("Percentage of samples where each reward component is non-zero:\n\n") comp_share = summary_stats["component_share"].copy() - formatted_rows: List[str] = [ + formatted_rows: list[str] = [ "| Component | Activation Rate |", "|-----------|----------------|", ] @@ -3864,7 +3863,7 @@ def write_complete_statistical_analysis( f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4)) _dropped = relationship_stats.get("correlation_dropped") or [] if _dropped: - dropped_strs: List[str] = [str(x) for x in _dropped] + dropped_strs: list[str] = [str(x) for x in _dropped] f.write("\n_Constant features removed: " + ", ".join(dropped_strs) + "._\n\n") # Section 3.5: PBRS Analysis @@ -3933,10 +3932,10 @@ def write_complete_statistical_analysis( f.write("|--------|-------|-------------|\n") f.write(f"| Mean Base Reward | {mean_base:.6f} | Average reward before PBRS |\n") f.write(f"| Std Base Reward | {std_base:.6f} | Variability of base reward |\n") - f.write(f"| Mean PBRS Delta | {mean_pbrs:.6f} | Average γ·Φ(s')−Φ(s) |\n") + f.write(f"| Mean PBRS Delta | {mean_pbrs:.6f} | Average γ·Φ(s')−Φ(s) |\n") # noqa: RUF001 f.write(f"| Std PBRS Delta | {std_pbrs:.6f} | Variability of PBRS delta |\n") f.write( - f"| Mean Invariance Correction | {mean_inv_corr:.6f} | Average reward_shaping − pbrs_delta |\n" + f"| Mean Invariance Correction | {mean_inv_corr:.6f} | Average reward_shaping − pbrs_delta |\n" # noqa: RUF001 ) f.write( f"| Std Invariance Correction | {std_inv_corr:.6f} | Variability of correction |\n" @@ -4093,7 +4092,7 @@ def write_complete_statistical_analysis( # Render as markdown without index column header = "| feature | importance_mean | importance_std |\n" sep = "|---------|------------------|----------------|\n" - rows: List[str] = [] + rows: list[str] = [] for _, r in top_imp.iterrows(): rows.append( f"| {r['feature']} | {_fmt_val(r['importance_mean'], 6)} | {_fmt_val(r['importance_std'], 6)} |" @@ -4120,16 +4119,16 @@ def write_complete_statistical_analysis( h = hypothesis_tests["idle_correlation"] f.write("#### 5.1.1 Idle Duration → Idle Penalty Correlation\n\n") f.write(f"**Test Method:** {h['test']}\n\n") - f.write(f"- Spearman ρ: **{h['rho']:.4f}**\n") + f.write(f"- Spearman ρ: **{h['rho']:.4f}**\n") # noqa: RUF001 f.write(f"- p-value: {h['p_value']:.4g}\n") if "p_value_adj" in h: f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" + f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" # noqa: RUF001 ) f.write(f"- 95% CI: [{h['ci_95'][0]:.4f}, {h['ci_95'][1]:.4f}]\n") f.write(f"- CI width: {(h['ci_95'][1] - h['ci_95'][0]):.4f}\n") f.write(f"- Sample size: {h['n_samples']:,}\n") - f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n") + f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n") # noqa: RUF001 f.write(f"- **Interpretation:** {h['interpretation']}\n\n") if "position_reward_difference" in hypothesis_tests: @@ -4140,11 +4139,11 @@ def write_complete_statistical_analysis( f.write(f"- p-value: {h['p_value']:.4g}\n") if "p_value_adj" in h: f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" + f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" # noqa: RUF001 ) f.write(f"- Effect size (ε²): {h['effect_size_epsilon_sq']:.4f}\n") f.write(f"- Number of groups: {h['n_groups']}\n") - f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n") + f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n") # noqa: RUF001 f.write(f"- **Interpretation:** {h['interpretation']} effect\n\n") if "pnl_sign_reward_difference" in hypothesis_tests: @@ -4155,11 +4154,11 @@ def write_complete_statistical_analysis( f.write(f"- p-value: {h['p_value']:.4g}\n") if "p_value_adj" in h: f.write( - f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" + f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n" # noqa: RUF001 ) f.write(f"- Median (PnL+): {h['median_pnl_positive']:.4f}\n") f.write(f"- Median (PnL-): {h['median_pnl_negative']:.4f}\n") - f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n") + f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n") # noqa: RUF001 # Bootstrap CI if bootstrap_ci: @@ -4408,7 +4407,7 @@ def main() -> None: "action_masking", ] - sim_params: Dict[str, Any] = {} + sim_params: dict[str, Any] = {} for k in candidate_keys: if k in args_dict: v = args_dict[k] @@ -4460,12 +4459,12 @@ def main() -> None: # Generate manifest summarizing key metrics try: manifest_path = args.out_dir / "manifest.json" - resolved_reward_params: Dict[str, Any] = dict( + resolved_reward_params: dict[str, Any] = dict( params ) # already validated/normalized upstream - manifest: Dict[str, Any] = { + manifest: dict[str, Any] = { "generated_at": pd.Timestamp.now().isoformat(), - "num_samples": int(len(df)), + "num_samples": len(df), "seed": int(args.seed), "pnl_target": float(profit_aim * risk_reward_ratio), "pvalue_adjust_method": args.pvalue_adjust, @@ -4475,13 +4474,13 @@ def main() -> None: sim_params_dict = df.attrs.get("simulation_params", {}) if not isinstance(sim_params_dict, dict): sim_params_dict = {} - sim_params: Dict[str, Any] = dict(sim_params_dict) + sim_params: dict[str, Any] = dict(sim_params_dict) if sim_params: excluded_for_hash = {"out_dir", "real_episodes"} - sim_params_for_hash: Dict[str, Any] = { + sim_params_for_hash: dict[str, Any] = { k: sim_params[k] for k in sim_params if k not in excluded_for_hash } - _hash_source: Dict[str, Any] = { + _hash_source: dict[str, Any] = { **{f"sim::{k}": sim_params_for_hash[k] for k in sorted(sim_params_for_hash)}, **{ f"reward::{k}": resolved_reward_params[k] diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py index b50f2e1..2bd6166 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py @@ -34,6 +34,7 @@ Exit codes from __future__ import annotations import argparse +import contextlib import itertools import json import math @@ -47,15 +48,15 @@ import sys import tempfile import time from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, TypedDict +from typing import Any, TypedDict try: from typing import NotRequired, Required # Python >=3.11 except ImportError: - from typing_extensions import NotRequired, Required # Python <3.11 + from typing import NotRequired, Required # Python <3.11 -ConfigTuple = Tuple[str, str, float, int, int, int] +ConfigTuple = tuple[str, str, float, int, int, int] SUMMARY_FILENAME = "reward_space_cli.json" @@ -66,25 +67,25 @@ class ScenarioResult(TypedDict): stdout: str stderr: str strict: bool - seconds: Optional[float] + seconds: float | None warnings: int class SummaryResult(TypedDict, total=False): # Required keys total: Required[int] - successes: Required[List[ScenarioResult]] - failures: Required[List[ScenarioResult]] - mean_seconds: Required[Optional[float]] - max_seconds: Required[Optional[float]] - min_seconds: Required[Optional[float]] - median_seconds: Required[Optional[float]] - p95_seconds: Required[Optional[float]] + successes: Required[list[ScenarioResult]] + failures: Required[list[ScenarioResult]] + mean_seconds: Required[float | None] + max_seconds: Required[float | None] + min_seconds: Required[float | None] + median_seconds: Required[float | None] + p95_seconds: Required[float | None] # Extension keys - warnings_breakdown: NotRequired[Dict[str, int]] - seeds: NotRequired[Dict[str, Any]] - metadata: NotRequired[Dict[str, Any]] + warnings_breakdown: NotRequired[dict[str, int]] + seeds: NotRequired[dict[str, Any]] + metadata: NotRequired[dict[str, Any]] interrupted: NotRequired[bool] @@ -102,8 +103,8 @@ def _is_warning_header(line: str) -> bool: def build_arg_matrix( max_scenarios: int = 40, - shuffle_seed: Optional[int] = None, -) -> List[ConfigTuple]: + shuffle_seed: int | None = None, +) -> list[ConfigTuple]: exit_potential_modes = [ "canonical", "non_canonical", @@ -126,7 +127,7 @@ def build_arg_matrix( exit_additive_enabled, ) - full: List[ConfigTuple] = list(product_iter) + full: list[ConfigTuple] = list(product_iter) full = [c for c in full if not (c[0] == "canonical" and (c[4] == 1 or c[5] == 1))] if shuffle_seed is not None: rnd = random.Random(shuffle_seed) @@ -135,10 +136,10 @@ def build_arg_matrix( return full step = len(full) / max_scenarios idx_pos = step / 2.0 # Centered sampling - selected: List[ConfigTuple] = [] + selected: list[ConfigTuple] = [] selected_indices: set[int] = set() for _ in range(max_scenarios): - idx = int(round(idx_pos)) + idx = round(idx_pos) if idx < 0: idx = 0 elif idx >= len(full): @@ -177,7 +178,7 @@ def run_scenario( skip_partial_dependence: bool = False, unrealized_pnl: bool = False, full_logs: bool = False, - params: Optional[List[str]] = None, + params: list[str] | None = None, tail_chars: int = 5000, ) -> ScenarioResult: ( @@ -223,7 +224,7 @@ def run_scenario( if strict: cmd.append("--strict_diagnostics") if params: - cmd += ["--params"] + list(params) + cmd += ["--params", *list(params)] start = time.perf_counter() try: proc = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout) @@ -371,8 +372,8 @@ def main(): scenarios = build_arg_matrix(max_scenarios=args.max_scenarios, shuffle_seed=args.shuffle_seed) # Validate --params basic KEY=VALUE format - valid_params: List[str] = [] - invalid_params: List[str] = [] + valid_params: list[str] = [] + invalid_params: list[str] = [] for p in args.params: if "=" in p: valid_params.append(p) @@ -384,7 +385,7 @@ def main(): args.params = valid_params # Prepare list of (conf, strict) - scenario_pairs: List[Tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios] + scenario_pairs: list[tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios] indices = {conf: idx for idx, conf in enumerate(scenarios, start=1)} n_duplicated = min(max(0, args.strict_sample), len(scenarios)) if n_duplicated > 0: @@ -392,7 +393,7 @@ def main(): for c in scenarios[:n_duplicated]: scenario_pairs.append((c, True)) - results: List[ScenarioResult] = [] + results: list[ScenarioResult] = [] total = len(scenario_pairs) interrupted = False try: @@ -425,7 +426,7 @@ def main(): successes = [r for r in results if r["status"] == "ok"] failures = [r for r in results if r["status"] != "ok"] - durations: List[float] = [ + durations: list[float] = [ float(r["seconds"]) for r in results if isinstance(r["seconds"], float) ] if durations: @@ -436,8 +437,8 @@ def main(): p95_seconds = _sorted[0] else: pos = 0.95 * (n - 1) - i0 = int(math.floor(pos)) - i1 = int(math.ceil(pos)) + i0 = math.floor(pos) + i1 = math.ceil(pos) if i0 == i1: p95_seconds = _sorted[i0] else: @@ -457,7 +458,7 @@ def main(): "p95_seconds": p95_seconds, } # Build warnings breakdown - warnings_breakdown: Dict[str, int] = {} + warnings_breakdown: dict[str, int] = {} for r in results: text = (r["stderr"] + "\n" + r["stdout"]).splitlines() for line in text: @@ -466,7 +467,7 @@ def main(): warnings_breakdown[fp] = warnings_breakdown.get(fp, 0) + 1 # Collect reproducibility metadata - def _git_hash() -> Optional[str]: + def _git_hash() -> str | None: try: proc = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], @@ -504,10 +505,11 @@ def main(): summary["interrupted"] = True # Atomic write to avoid corrupt partial files tmp_fd, tmp_path = tempfile.mkstemp(prefix="_tmp_summary_", dir=str(out_dir)) + tmp_path_obj = Path(tmp_path) try: with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh: json.dump(summary, fh, indent=2) - os.replace(tmp_path, out_dir / SUMMARY_FILENAME) + tmp_path_obj.replace(out_dir / SUMMARY_FILENAME) except Exception: # Best effort fallback try: @@ -515,18 +517,14 @@ def main(): json.dumps(summary, indent=2), encoding="utf-8" ) finally: - if os.path.exists(tmp_path): - try: - os.remove(tmp_path) - except OSError: - pass + if tmp_path_obj.exists(): + with contextlib.suppress(OSError): + tmp_path_obj.unlink() else: # Defensive cleanup: remove temp file if atomic replace did not clean up - if os.path.exists(tmp_path): - try: - os.remove(tmp_path) - except OSError: - pass + if tmp_path_obj.exists(): + with contextlib.suppress(OSError): + tmp_path_obj.unlink() print(f"Summary saved to: {out_dir / SUMMARY_FILENAME}") if not interrupted and summary["failures"]: print("Failures detected:") diff --git a/ReforceXY/reward_space_analysis/tests/api/test_api_helpers.py b/ReforceXY/reward_space_analysis/tests/api/test_api_helpers.py index d07eaac..b794f95 100644 --- a/ReforceXY/reward_space_analysis/tests/api/test_api_helpers.py +++ b/ReforceXY/reward_space_analysis/tests/api/test_api_helpers.py @@ -235,7 +235,9 @@ class TestAPIAndHelpers(RewardSpaceTestBase): self.assertTrue(math.isnan(_get_float_param({"k": float("-inf")}, "k", 0.0))) self.assertTrue(math.isnan(_get_float_param({"k": np.nan}, "k", 0.0))) self.assertTrue( - math.isnan(_get_float_param(cast(RewardParams, {"k": cast(Any, [1, 2, 3])}), "k", 0.0)) + math.isnan( + _get_float_param(cast("RewardParams", {"k": cast("Any", [1, 2, 3])}), "k", 0.0) + ) ) def test_get_str_param(self): @@ -284,7 +286,9 @@ class TestAPIAndHelpers(RewardSpaceTestBase): self.assertEqual(_get_int_param({"k": ""}, "k", 5), 5) self.assertEqual(_get_int_param({"k": "abc"}, "k", 5), 5) self.assertEqual(_get_int_param({"k": "NaN"}, "k", 5), 5) - self.assertEqual(_get_int_param(cast(RewardParams, {"k": cast(Any, [1, 2, 3])}), "k", 3), 3) + self.assertEqual( + _get_int_param(cast("RewardParams", {"k": cast("Any", [1, 2, 3])}), "k", 3), 3 + ) self.assertEqual(_get_int_param({}, "missing", "zzz"), 0) def test_argument_parser_construction(self): diff --git a/ReforceXY/reward_space_analysis/tests/cli/test_cli_params_and_csv.py b/ReforceXY/reward_space_analysis/tests/cli/test_cli_params_and_csv.py index 33b03dc..6dd93e7 100644 --- a/ReforceXY/reward_space_analysis/tests/cli/test_cli_params_and_csv.py +++ b/ReforceXY/reward_space_analysis/tests/cli/test_cli_params_and_csv.py @@ -62,11 +62,11 @@ class TestCsvEncoding(RewardSpaceTestBase): self.assertIn("action", df.columns) values = df["action"].tolist() self.assertTrue( - all((float(v).is_integer() for v in values)), + all(float(v).is_integer() for v in values), "Non-integer values detected in 'action' column", ) allowed = {int(action.value) for action in Actions} - self.assertTrue(set((int(v) for v in values)).issubset(allowed)) + self.assertTrue({int(v) for v in values}.issubset(allowed)) class TestParamsPropagation(RewardSpaceTestBase): @@ -181,7 +181,7 @@ class TestParamsPropagation(RewardSpaceTestBase): _assert_cli_success(self, result) manifest_path = out_dir / "manifest.json" self.assertTrue(manifest_path.exists(), "Missing manifest.json") - with open(manifest_path, "r") as f: + with manifest_path.open() as f: manifest = json.load(f) self.assertIn("reward_params", manifest) self.assertIn("simulation_params", manifest) @@ -208,7 +208,7 @@ class TestParamsPropagation(RewardSpaceTestBase): _assert_cli_success(self, result) manifest_path = out_dir / "manifest.json" self.assertTrue(manifest_path.exists(), "Missing manifest.json") - with open(manifest_path, "r") as f: + with manifest_path.open() as f: manifest = json.load(f) self.assertIn("reward_params", manifest) self.assertIn("simulation_params", manifest) diff --git a/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py b/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py index caca85b..496fdfa 100644 --- a/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py +++ b/ReforceXY/reward_space_analysis/tests/components/test_reward_components.py @@ -194,7 +194,7 @@ class TestRewardComponents(RewardSpaceTestBase): **Setup:** - PnL: 0.0 (breakeven) - - pnl_target: profit_aim × risk_reward_ratio + - pnl_target: profit_aim * risk_reward_ratio - Parameters: default base_params **Assertions:** @@ -219,7 +219,7 @@ class TestRewardComponents(RewardSpaceTestBase): **Setup:** - PnL: 150% of pnl_target (exceeds target by 50%) - - pnl_target: 0.045 (profit_aim=0.03 × risk_reward_ratio=1.5) + - pnl_target: 0.045 (profit_aim=0.03 * risk_reward_ratio=1.5) - Parameters: win_reward_factor=2.0, pnl_factor_beta=0.5 **Assertions:** @@ -250,7 +250,7 @@ class TestRewardComponents(RewardSpaceTestBase): **Setup:** - PnL: -0.06 (exceeds pnl_target magnitude) - - pnl_target: 0.045 (profit_aim=0.03 × risk_reward_ratio=1.5) + - pnl_target: 0.045 (profit_aim=0.03 * risk_reward_ratio=1.5) - Penalty threshold: pnl < -pnl_target = -0.045 - Parameters: win_reward_factor=2.0, pnl_factor_beta=0.5 @@ -381,7 +381,7 @@ class TestRewardComponents(RewardSpaceTestBase): **Setup:** - PnL: -0.005 (very close to min_unrealized_profit=-0.006) - Efficiency ratio: (-0.005 - (-0.006)) / (0.0 - (-0.006)) ≈ 0.167 (low) - - For losses: coefficient = 1 + weight × (center - ratio) → rewards low ratio + - For losses: coefficient = 1 + weight * (center - ratio) → rewards low ratio - efficiency_weight: 1.0, efficiency_center: 0.5 - Trade context: Long position cutting losses quickly @@ -620,7 +620,7 @@ class TestRewardComponents(RewardSpaceTestBase): pnl_ratio = pnl / pnl_target expected = 1.0 + win_reward_factor * math.tanh(beta * (pnl_ratio - 1.0)) expected_ratios.append(expected) - for obs, exp in zip(ratios_observed, expected_ratios): + for obs, exp in zip(ratios_observed, expected_ratios, strict=False): self.assertFinite(obs, name="observed_ratio") self.assertFinite(exp, name="expected_ratio") self.assertLess( @@ -634,7 +634,7 @@ class TestRewardComponents(RewardSpaceTestBase): Verifies: - max_idle_duration = None → use max_trade_duration as fallback - - penalty(duration=40) ≈ 2 × penalty(duration=20) + - penalty(duration=40) ≈ 2 * penalty(duration=20) - Proportional scaling with idle duration """ base_factor = PARAMS.BASE_FACTOR diff --git a/ReforceXY/reward_space_analysis/tests/constants.py b/ReforceXY/reward_space_analysis/tests/constants.py index 731f997..8db775b 100644 --- a/ReforceXY/reward_space_analysis/tests/constants.py +++ b/ReforceXY/reward_space_analysis/tests/constants.py @@ -408,22 +408,22 @@ STAT_TOL: Final[StatisticalTolerances] = StatisticalTolerances() __all__ = [ - "ToleranceConfig", + "CONTINUITY", + "EXIT_FACTOR", + "PARAMS", + "PBRS", + "SCENARIOS", + "SEEDS", + "STATISTICAL", + "STAT_TOL", + "TOLERANCE", "ContinuityConfig", "ExitFactorConfig", "PBRSConfig", "StatisticalConfig", - "TestSeeds", + "StatisticalTolerances", "TestParameters", "TestScenarios", - "StatisticalTolerances", - "TOLERANCE", - "CONTINUITY", - "EXIT_FACTOR", - "PBRS", - "STATISTICAL", - "SEEDS", - "PARAMS", - "SCENARIOS", - "STAT_TOL", + "TestSeeds", + "ToleranceConfig", ] diff --git a/ReforceXY/reward_space_analysis/tests/helpers/__init__.py b/ReforceXY/reward_space_analysis/tests/helpers/__init__.py index 7160a87..e90a6e7 100644 --- a/ReforceXY/reward_space_analysis/tests/helpers/__init__.py +++ b/ReforceXY/reward_space_analysis/tests/helpers/__init__.py @@ -60,52 +60,52 @@ from .warnings import ( ) __all__ = [ - "assert_monotonic_nonincreasing", - "assert_monotonic_nonnegative", - "assert_finite", + "DEFAULT_REWARD_CONFIG", + "DEFAULT_SIMULATION_CONFIG", + "ContextFactory", + "ExitFactorConfig", + "ProgressiveScalingConfig", + "RewardScenarioConfig", + "SimulationConfig", + "StatisticalTestConfig", + "ThresholdTestConfig", + "ValidationCallback", + "ValidationConfig", + "WarningCaptureConfig", + "assert_adjustment_reason_contains", "assert_almost_equal_list", - "assert_trend", "assert_component_sum_integrity", - "assert_progressive_scaling_behavior", - "assert_single_active_component", - "assert_single_active_component_with_additives", - "assert_reward_calculation_scenarios", - "assert_parameter_sensitivity_behavior", - "make_idle_penalty_test_contexts", + "assert_diagnostic_warning", "assert_exit_factor_attenuation_modes", + "assert_exit_factor_invariant_suite", + "assert_exit_factor_kernel_fallback", "assert_exit_factor_plateau_behavior", "assert_exit_mode_mathematical_validation", - "assert_multi_parameter_sensitivity", + "assert_finite", "assert_hold_penalty_threshold_behavior", - "safe_float", - "build_validation_case", - "execute_validation_batch", - "assert_adjustment_reason_contains", - "run_strict_validation_failure_cases", - "run_relaxed_validation_adjustment_cases", - "assert_exit_factor_invariant_suite", - "assert_exit_factor_kernel_fallback", - "assert_relaxed_multi_reason_aggregation", - "assert_pbrs_invariance_report_classification", - "assert_pbrs_canonical_sum_within_tolerance", + "assert_monotonic_nonincreasing", + "assert_monotonic_nonnegative", + "assert_multi_parameter_sensitivity", + "assert_no_warnings", "assert_non_canonical_shaping_exceeds", + "assert_parameter_sensitivity_behavior", + "assert_pbrs_canonical_sum_within_tolerance", + "assert_pbrs_invariance_report_classification", + "assert_progressive_scaling_behavior", + "assert_relaxed_multi_reason_aggregation", + "assert_reward_calculation_scenarios", + "assert_single_active_component", + "assert_single_active_component_with_additives", + "assert_trend", + "build_validation_case", "calculate_reward_with_defaults", + "capture_warnings", + "execute_validation_batch", "get_exit_factor_with_defaults", + "make_idle_penalty_test_contexts", + "run_relaxed_validation_adjustment_cases", + "run_strict_validation_failure_cases", + "safe_float", "simulate_samples_with_defaults", - "RewardScenarioConfig", - "ValidationConfig", - "ThresholdTestConfig", - "ProgressiveScalingConfig", - "ExitFactorConfig", - "StatisticalTestConfig", - "SimulationConfig", - "WarningCaptureConfig", - "ValidationCallback", - "ContextFactory", - "DEFAULT_REWARD_CONFIG", - "DEFAULT_SIMULATION_CONFIG", - "capture_warnings", - "assert_diagnostic_warning", - "assert_no_warnings", "validate_warning_content", ] diff --git a/ReforceXY/reward_space_analysis/tests/helpers/assertions.py b/ReforceXY/reward_space_analysis/tests/helpers/assertions.py index 530af44..76b6cc1 100644 --- a/ReforceXY/reward_space_analysis/tests/helpers/assertions.py +++ b/ReforceXY/reward_space_analysis/tests/helpers/assertions.py @@ -4,7 +4,9 @@ These functions centralize common numeric and behavioral checks to enforce single invariant ownership and reduce duplication across taxonomy modules. """ -from typing import Any, Dict, List, Sequence, Tuple +import itertools +from collections.abc import Sequence +from typing import Any import numpy as np @@ -358,7 +360,7 @@ def assert_single_active_component_with_additives( def assert_reward_calculation_scenarios( test_case, - scenarios: List[Tuple[Any, Dict[str, Any], str]], + scenarios: list[tuple[Any, dict[str, Any], str]], config: RewardScenarioConfig, validation_fn, ): @@ -405,9 +407,9 @@ def assert_reward_calculation_scenarios( def assert_parameter_sensitivity_behavior( test_case, - parameter_variations: List[Dict[str, Any]], + parameter_variations: list[dict[str, Any]], base_context, - base_params: Dict[str, Any], + base_params: dict[str, Any], component_name: str, expected_trend: str, config: RewardScenarioConfig, @@ -486,7 +488,7 @@ def assert_parameter_sensitivity_behavior( def make_idle_penalty_test_contexts( context_factory_fn, idle_duration_scenarios: Sequence[int], - base_context_kwargs: Dict[str, Any] | None = None, + base_context_kwargs: dict[str, Any] | None = None, ): """Generate contexts for idle penalty testing with varying durations. @@ -541,7 +543,7 @@ def assert_exit_factor_attenuation_modes( test_case: Test case instance with assertion methods base_factor: Base scaling factor pnl: Realized profit/loss - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) + pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) context: RewardContext for efficiency coefficient calculation attenuation_modes: List of mode names to test base_params_fn: Factory function for creating parameter dicts @@ -588,12 +590,14 @@ def assert_exit_factor_attenuation_modes( if mode == "plateau_linear": grace = float(mode_params["exit_plateau_grace"]) filtered = [ - (r, v) for r, v in zip(ratios, values) if r >= grace - tolerance_relaxed + (r, v) + for r, v in zip(ratios, values, strict=False) + if r >= grace - tolerance_relaxed ] values_to_check = [v for _, v in filtered] else: values_to_check = values - for earlier, later in zip(values_to_check, values_to_check[1:]): + for earlier, later in itertools.pairwise(values_to_check): test_case.assertLessEqual( later, earlier + tolerance_relaxed, f"Non-monotonic attenuation in mode={mode}" ) @@ -602,7 +606,7 @@ def assert_exit_factor_attenuation_modes( def assert_exit_mode_mathematical_validation( test_case, context, - params: Dict[str, Any], + params: dict[str, Any], base_factor: float, profit_aim: float, risk_reward_ratio: float, @@ -704,16 +708,16 @@ def assert_exit_mode_mathematical_validation( reward_half_life.exit_component, reward_linear.exit_component, ] - test_case.assertTrue(all((r > 0 for r in rewards))) - unique_rewards = set((f"{r:.6f}" for r in rewards)) + test_case.assertTrue(all(r > 0 for r in rewards)) + unique_rewards = {f"{r:.6f}" for r in rewards} test_case.assertGreater(len(unique_rewards), 1) def assert_multi_parameter_sensitivity( test_case, - parameter_test_cases: List[Tuple[float, float, str]], + parameter_test_cases: list[tuple[float, float, str]], context_factory_fn, - base_params: Dict[str, Any], + base_params: dict[str, Any], config: RewardScenarioConfig, ): """Validate reward behavior across multiple parameter combinations. @@ -781,7 +785,7 @@ def assert_multi_parameter_sensitivity( def assert_hold_penalty_threshold_behavior( test_case, context_factory_fn, - params: Dict[str, Any], + params: dict[str, Any], base_factor: float, profit_aim: float, risk_reward_ratio: float, @@ -842,11 +846,11 @@ def assert_hold_penalty_threshold_behavior( def build_validation_case( - param_updates: Dict[str, Any], + param_updates: dict[str, Any], strict: bool, expect_error: bool = False, expected_reason_substrings: Sequence[str] | None = None, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Build a structured validation test case descriptor. Creates a standardized test case dictionary for parameter validation testing, @@ -876,7 +880,7 @@ def build_validation_case( } -def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validate_fn): +def execute_validation_batch(test_case, cases: Sequence[dict[str, Any]], validate_fn): """Execute a batch of parameter validation test cases. Runs multiple validation scenarios in batch, handling both strict (error-raising) @@ -903,7 +907,7 @@ def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validat params = case["params"].copy() strict_flag = case["strict"] if strict_flag and case["expect_error"]: - test_case.assertRaises(Exception, validate_fn, params, True) + test_case.assertRaises(ValueError, validate_fn, params, True) continue result = validate_fn(params, strict=strict_flag) if isinstance(result, tuple) and len(result) == 2 and isinstance(result[0], dict): @@ -922,7 +926,7 @@ def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validat def assert_adjustment_reason_contains( - test_case, adjustments: Dict[str, Dict[str, Any]], key: str, expected_substrings: Sequence[str] + test_case, adjustments: dict[str, dict[str, Any]], key: str, expected_substrings: Sequence[str] ): """Assert adjustment reason contains all expected substrings. @@ -953,7 +957,7 @@ def assert_adjustment_reason_contains( def run_strict_validation_failure_cases( - test_case, failure_params_list: Sequence[Dict[str, Any]], validate_fn + test_case, failure_params_list: Sequence[dict[str, Any]], validate_fn ): """Batch test strict validation failures. @@ -983,7 +987,7 @@ def run_strict_validation_failure_cases( def run_relaxed_validation_adjustment_cases( test_case, - relaxed_cases: Sequence[Tuple[Dict[str, Any], Sequence[str]]], + relaxed_cases: Sequence[tuple[dict[str, Any], Sequence[str]]], validate_fn, ): """Batch test relaxed validation adjustments. @@ -1020,7 +1024,7 @@ def run_relaxed_validation_adjustment_cases( def assert_exit_factor_invariant_suite( - test_case, suite_cases: Sequence[Dict[str, Any]], exit_factor_fn + test_case, suite_cases: Sequence[dict[str, Any]], exit_factor_fn ): """Validate exit factor invariants across multiple scenarios. @@ -1033,7 +1037,7 @@ def assert_exit_factor_invariant_suite( suite_cases: List of scenario dicts with keys: - base_factor: Base scaling factor - pnl: Realized profit/loss - - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) for coefficient calculation + - pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) for coefficient calculation - context: RewardContext for efficiency coefficient - duration_ratio: Duration ratio (0-2) - params: Parameter dictionary @@ -1088,8 +1092,8 @@ def assert_exit_factor_kernel_fallback( pnl_target: float, duration_ratio: float, context, - bad_params: Dict[str, Any], - reference_params: Dict[str, Any], + bad_params: dict[str, Any], + reference_params: dict[str, Any], risk_reward_ratio: float, ): """Validate exit factor fallback behavior on kernel failure. @@ -1141,8 +1145,8 @@ def assert_exit_factor_kernel_fallback( def assert_relaxed_multi_reason_aggregation( test_case, validate_fn, - params: Dict[str, Any], - key_expectations: Dict[str, Sequence[str]], + params: dict[str, Any], + key_expectations: dict[str, Sequence[str]], ): """Validate relaxed validation produces expected adjustment reasons. @@ -1268,7 +1272,7 @@ def assert_exit_factor_plateau_behavior( exit_factor_fn: Exit factor calculation function (_get_exit_factor) base_factor: Base factor for exit calculation pnl: PnL value - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) for coefficient calculation + pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) for coefficient calculation context: RewardContext for efficiency coefficient plateau_params: Parameters dict with plateau configuration grace: Grace period threshold (exit_plateau_grace value) @@ -1314,7 +1318,7 @@ def assert_exit_factor_plateau_behavior( def calculate_reward_with_defaults( context, - params: Dict[str, Any], + params: dict[str, Any], config: RewardScenarioConfig | None = None, **overrides, ): @@ -1376,7 +1380,7 @@ def get_exit_factor_with_defaults( pnl: float, duration_ratio: float, context, - params: Dict[str, Any], + params: dict[str, Any], base_factor: float | None = None, pnl_target: float | None = None, risk_reward_ratio: float | None = None, @@ -1427,7 +1431,7 @@ def get_exit_factor_with_defaults( def simulate_samples_with_defaults( - params: Dict[str, Any], + params: dict[str, Any], config: SimulationConfig | None = None, base_factor: float | None = None, profit_aim: float | None = None, diff --git a/ReforceXY/reward_space_analysis/tests/helpers/configs.py b/ReforceXY/reward_space_analysis/tests/helpers/configs.py index 12742dd..d3ad700 100644 --- a/ReforceXY/reward_space_analysis/tests/helpers/configs.py +++ b/ReforceXY/reward_space_analysis/tests/helpers/configs.py @@ -21,8 +21,8 @@ Usage: ... ) """ +from collections.abc import Callable from dataclasses import dataclass -from typing import Callable, Optional from ..constants import PARAMS, SEEDS, STATISTICAL, TOLERANCE @@ -67,7 +67,7 @@ class ValidationConfig: tolerance_strict: float = TOLERANCE.IDENTITY_STRICT tolerance_relaxed: float = TOLERANCE.IDENTITY_RELAXED - exclude_components: Optional[list[str]] = None + exclude_components: list[str] | None = None component_description: str = "reward components" @@ -117,7 +117,7 @@ class ExitFactorConfig: decomposition, attenuation mode and plateau behavior. The exit factor is computed as: - exit_factor = base_factor × time_attenuation × pnl_target × efficiency + exit_factor = base_factor * time_attenuation * pnl_target * efficiency Attributes: base_factor: Base scaling factor @@ -160,7 +160,7 @@ class StatisticalTestConfig: n_bootstrap: int = STATISTICAL.BOOTSTRAP_DEFAULT_ITERATIONS confidence_level: float = 0.95 seed: int = SEEDS.BASE - adjust_method: Optional[str] = None + adjust_method: str | None = None alpha: float = 0.05 @@ -236,16 +236,16 @@ DEFAULT_SIMULATION_CONFIG: SimulationConfig = SimulationConfig( __all__ = [ - "RewardScenarioConfig", - "ValidationConfig", - "ThresholdTestConfig", - "ProgressiveScalingConfig", + "DEFAULT_REWARD_CONFIG", + "DEFAULT_SIMULATION_CONFIG", + "ContextFactory", "ExitFactorConfig", - "StatisticalTestConfig", + "ProgressiveScalingConfig", + "RewardScenarioConfig", "SimulationConfig", - "WarningCaptureConfig", + "StatisticalTestConfig", + "ThresholdTestConfig", "ValidationCallback", - "ContextFactory", - "DEFAULT_REWARD_CONFIG", - "DEFAULT_SIMULATION_CONFIG", + "ValidationConfig", + "WarningCaptureConfig", ] diff --git a/ReforceXY/reward_space_analysis/tests/helpers/warnings.py b/ReforceXY/reward_space_analysis/tests/helpers/warnings.py index 9de1319..fb12852 100644 --- a/ReforceXY/reward_space_analysis/tests/helpers/warnings.py +++ b/ReforceXY/reward_space_analysis/tests/helpers/warnings.py @@ -16,7 +16,7 @@ Usage: import warnings from contextlib import contextmanager -from typing import Any, Optional +from typing import Any import reward_space_analysis @@ -55,7 +55,7 @@ def capture_warnings(warning_category: type[Warning] = Warning, always_capture: @contextmanager def assert_diagnostic_warning( expected_substrings: list[str], - warning_category: Optional[type[Warning]] = None, + warning_category: type[Warning] | None = None, strict_mode: bool = True, ): """Context manager that captures warnings and asserts their presence. @@ -192,8 +192,8 @@ def validate_warning_content( __all__ = [ - "capture_warnings", "assert_diagnostic_warning", "assert_no_warnings", + "capture_warnings", "validate_warning_content", ] diff --git a/ReforceXY/reward_space_analysis/tests/integration/test_integration.py b/ReforceXY/reward_space_analysis/tests/integration/test_integration.py index e1eadef..e48c17a 100644 --- a/ReforceXY/reward_space_analysis/tests/integration/test_integration.py +++ b/ReforceXY/reward_space_analysis/tests/integration/test_integration.py @@ -90,7 +90,7 @@ class TestIntegration(RewardSpaceTestBase): _assert_cli_success(self, result2) for run_dir in ["run1", "run2"]: - with open(self.output_path / run_dir / "manifest.json", "r") as f: + with (self.output_path / run_dir / "manifest.json").open() as f: manifest = json.load(f) required_keys = { "generated_at", @@ -112,9 +112,9 @@ class TestIntegration(RewardSpaceTestBase): self.assertEqual(manifest["num_samples"], SCENARIOS.SAMPLE_SIZE_SMALL) self.assertEqual(manifest["seed"], SEEDS.BASE) - with open(self.output_path / "run1" / "manifest.json", "r") as f: + with (self.output_path / "run1" / "manifest.json").open() as f: manifest1 = json.load(f) - with open(self.output_path / "run2" / "manifest.json", "r") as f: + with (self.output_path / "run2" / "manifest.json").open() as f: manifest2 = json.load(f) self.assertEqual( manifest1["params_hash"], diff --git a/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py b/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py index aaac0d3..85dc8e2 100644 --- a/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py +++ b/ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py @@ -492,8 +492,8 @@ class TestPBRS(RewardSpaceTestBase): terminal_next_potentials, shaping_values = self._canonical_sweep(params) self.assertEqual(params, params_before) if terminal_next_potentials: - self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials))) - max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0 + self.assertTrue(all(abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials)) + max_abs = max(abs(v) for v in shaping_values) if shaping_values else 0.0 self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING) def test_progressive_release_negative_decay_clamped(self): @@ -528,7 +528,7 @@ class TestPBRS(RewardSpaceTestBase): gamma = float(gamma_fallback) except Exception: gamma = 0.95 - # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0. + # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0. # noqa: RUF003 self.assertLessEqual( abs(shaping - ((gamma - 1.0) * prev_potential)), TOLERANCE.GENERIC_EQ, @@ -788,7 +788,7 @@ class TestPBRS(RewardSpaceTestBase): ) execute_validation_batch( self, - [success_case] + strict_failures + [relaxed_case], + [success_case, *strict_failures, relaxed_case], validate_reward_parameters, ) params_relaxed = DEFAULT_MODEL_REWARD_PARAMETERS.copy() @@ -815,13 +815,13 @@ class TestPBRS(RewardSpaceTestBase): def test_compute_exit_potential_mode_differences(self): """Exit potential modes: canonical vs spike_cancel shaping magnitude differences.""" gamma = 0.93 - base_common = dict( - hold_potential_enabled=True, - potential_gamma=gamma, - entry_additive_enabled=False, - exit_additive_enabled=False, - hold_potential_ratio=1.0, - ) + base_common = { + "hold_potential_enabled": True, + "potential_gamma": gamma, + "entry_additive_enabled": False, + "exit_additive_enabled": False, + "hold_potential_ratio": 1.0, + } ctx_pnl = 0.012 ctx_dur_ratio = 0.3 params_can = self.base_params(exit_potential_mode="canonical", **base_common) @@ -1113,7 +1113,7 @@ class TestPBRS(RewardSpaceTestBase): self.assertLessEqual(abs(shap), PBRS.MAX_ABS_SHAPING) # With bounded transforms and hold_potential_ratio=1: - # |Φ(s)| <= base_factor and |Δ| <= (1+γ)*base_factor + # |Φ(s)| <= base_factor and |Δ| <= (1+γ)*base_factor # noqa: RUF003 self.assertLessEqual(abs(float(shap)), (1.0 + gamma) * PARAMS.BASE_FACTOR) def test_report_cumulative_invariance_aggregation(self): @@ -1159,10 +1159,7 @@ class TestPBRS(RewardSpaceTestBase): if abs(inc) > max_abs_step: max_abs_step = abs(inc) steps += 1 - if is_exit: - prev_potential = 0.0 - else: - prev_potential = next_potential + prev_potential = 0.0 if is_exit else next_potential mean_drift = telescoping_sum / max(1, steps) self.assertLess( abs(mean_drift), diff --git a/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py b/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py index 6962da7..490cec6 100644 --- a/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py +++ b/ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py @@ -44,8 +44,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): def test_decomposition_integrity(self): """reward must equal the single active core component under mutually exclusive scenarios (idle/hold/exit/invalid).""" scenarios = [ - dict( - ctx=self.make_ctx( + { + "ctx": self.make_ctx( pnl=0.0, trade_duration=0, idle_duration=25, @@ -54,10 +54,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): position=Positions.Neutral, action=Actions.Neutral, ), - active="idle_penalty", - ), - dict( - ctx=self.make_ctx( + "active": "idle_penalty", + }, + { + "ctx": self.make_ctx( pnl=0.0, trade_duration=150, idle_duration=0, @@ -66,10 +66,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): position=Positions.Long, action=Actions.Neutral, ), - active="hold_penalty", - ), - dict( - ctx=self.make_ctx( + "active": "hold_penalty", + }, + { + "ctx": self.make_ctx( pnl=PARAMS.PROFIT_AIM, trade_duration=60, idle_duration=0, @@ -78,10 +78,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): position=Positions.Long, action=Actions.Long_exit, ), - active="exit_component", - ), - dict( - ctx=self.make_ctx( + "active": "exit_component", + }, + { + "ctx": self.make_ctx( pnl=0.01, trade_duration=10, idle_duration=0, @@ -90,8 +90,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): position=Positions.Short, action=Actions.Long_exit, ), - active="invalid_penalty", - ), + "active": "invalid_penalty", + }, ] for sc in scenarios: ctx_obj = sc["ctx"] @@ -178,7 +178,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): ) # Part 2: Monotonic attenuation validation - modes = list(ATTENUATION_MODES) + ["plateau_linear"] + modes = [*list(ATTENUATION_MODES), "plateau_linear"] test_pnl = 0.05 test_context = self.make_ctx( pnl=test_pnl, @@ -232,12 +232,9 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): self.assertTrue(runtime_warnings) self.assertTrue( any( - ( - ">" in str(w.message) - and "threshold" in str(w.message) - or "|exit_factor|=" in str(w.message) - for w in runtime_warnings - ) + (">" in str(w.message) and "threshold" in str(w.message)) + or "|exit_factor|=" in str(w.message) + for w in runtime_warnings ) ) @@ -298,10 +295,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): params, PARAMS.RISK_REWARD_RATIO, ) - if 0.0 < tau <= 1.0: - alpha = -math.log(tau) / math.log(2.0) - else: - alpha = 1.0 + alpha = -math.log(tau) / math.log(2.0) if 0.0 < tau <= 1.0 else 1.0 expected_ratio = 1.0 / (1.0 + duration_ratio) ** alpha observed_ratio = f1 / f0 if f0 != 0 else np.nan self.assertFinite(observed_ratio, name="observed_ratio") @@ -656,7 +650,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): f"Scaling ratio too large (ratio={ratio:.2f})", ) - # === Robustness invariants 102–105 === + # === Robustness invariants 102–105 === # noqa: RUF003 # Owns invariant: robustness-exit-mode-fallback-102 def test_robustness_102_unknown_exit_mode_fallback_linear(self): """Invariant 102: Unknown exit_attenuation_mode gracefully warns and falls back to linear kernel.""" diff --git a/ReforceXY/reward_space_analysis/tests/statistics/test_feature_analysis_failures.py b/ReforceXY/reward_space_analysis/tests/statistics/test_feature_analysis_failures.py index 2147c23..5434718 100644 --- a/ReforceXY/reward_space_analysis/tests/statistics/test_feature_analysis_failures.py +++ b/ReforceXY/reward_space_analysis/tests/statistics/test_feature_analysis_failures.py @@ -80,7 +80,7 @@ def test_feature_analysis_empty_frame(): - model is None """ df = _minimal_df(0) # empty - importance_df, stats, partial_deps, model = _perform_feature_analysis( + importance_df, stats, _partial_deps, model = _perform_feature_analysis( df, seed=SEEDS.FEATURE_EMPTY, skip_partial_dependence=True ) assert importance_df.empty @@ -102,7 +102,7 @@ def test_feature_analysis_single_feature_path(): """ rng = np.random.default_rng(SEEDS.FEATURE_PRIME_11) df = pd.DataFrame({"pnl": rng.normal(0, 1, 25), "reward": rng.normal(0, 1, 25)}) - importance_df, stats, partial_deps, model = _perform_feature_analysis( + importance_df, stats, _partial_deps, model = _perform_feature_analysis( df, seed=SEEDS.FEATURE_PRIME_11, skip_partial_dependence=True ) assert stats["n_features"] == 1 @@ -132,7 +132,7 @@ def test_feature_analysis_nans_present_path(): "reward": rng.normal(0, 1, 40), } ) - importance_df, stats, partial_deps, model = _perform_feature_analysis( + importance_df, stats, _partial_deps, model = _perform_feature_analysis( df, seed=SEEDS.FEATURE_PRIME_13, skip_partial_dependence=True ) # Should hit NaN stub path (model_fitted False) @@ -161,12 +161,12 @@ def test_feature_analysis_model_fitting_failure(monkeypatch): if RandomForestRegressor is None: # type: ignore[comparison-overlap] pytest.skip("sklearn components unavailable; skipping model fitting failure test") - def boom(self, *a, **kw): # noqa: D401 + def boom(self, *a, **kw): raise RuntimeError("forced fit failure") monkeypatch.setattr(RandomForestRegressor, "fit", boom) df = _minimal_df(50) - importance_df, stats, partial_deps, model = _perform_feature_analysis( + importance_df, stats, _partial_deps, model = _perform_feature_analysis( df, seed=SEEDS.FEATURE_PRIME_21, skip_partial_dependence=True ) assert stats["model_fitted"] is False @@ -194,7 +194,7 @@ def test_feature_analysis_permutation_failure_partial_dependence(monkeypatch): """ # Monkeypatch permutation_importance to raise while allowing partial dependence - def perm_boom(*a, **kw): # noqa: D401 + def perm_boom(*a, **kw): raise RuntimeError("forced permutation failure") monkeypatch.setattr("reward_space_analysis.permutation_importance", perm_boom) @@ -249,7 +249,7 @@ def test_feature_analysis_import_fallback(monkeypatch): def test_module_level_sklearn_import_failure_reload(): - """Force module-level sklearn import failure to execute fallback block (lines 32–42). + """Force module-level sklearn import failure to execute fallback block (lines 32-42). Strategy: - Temporarily monkeypatch builtins.__import__ to raise on any 'sklearn' import. @@ -261,7 +261,7 @@ def test_module_level_sklearn_import_failure_reload(): orig_mod = sys.modules.get("reward_space_analysis") orig_import = builtins.__import__ - def fake_import(name, *args, **kwargs): # noqa: D401 + def fake_import(name, *args, **kwargs): if name.startswith("sklearn"): raise RuntimeError("forced sklearn import failure") return orig_import(name, *args, **kwargs) @@ -274,10 +274,10 @@ def test_module_level_sklearn_import_failure_reload(): reloaded_module = importlib.import_module("reward_space_analysis") # Fallback assigns sklearn symbols to None - assert getattr(reloaded_module, "RandomForestRegressor") is None - assert getattr(reloaded_module, "train_test_split") is None - assert getattr(reloaded_module, "permutation_importance") is None - assert getattr(reloaded_module, "r2_score") is None + assert reloaded_module.RandomForestRegressor is None + assert reloaded_module.train_test_split is None + assert reloaded_module.permutation_importance is None + assert reloaded_module.r2_score is None # Perform feature analysis should raise ImportError under missing components df = _minimal_df(15) with pytest.raises(ImportError): diff --git a/ReforceXY/reward_space_analysis/tests/statistics/test_statistics.py b/ReforceXY/reward_space_analysis/tests/statistics/test_statistics.py index e5542aa..7487bfb 100644 --- a/ReforceXY/reward_space_analysis/tests/statistics/test_statistics.py +++ b/ReforceXY/reward_space_analysis/tests/statistics/test_statistics.py @@ -45,7 +45,7 @@ class TestStatistics(RewardSpaceTestBase): # Use existing helper to get synthetic stats df (small for speed) df = self.make_stats_df(n=120, seed=SEEDS.BASE, idle_pattern="mixed") try: - importance_df, analysis_stats, partial_deps, model = _perform_feature_analysis( + importance_df, analysis_stats, partial_deps, _model = _perform_feature_analysis( df, seed=SEEDS.BASE, skip_partial_dependence=True, rf_n_jobs=1, perm_n_jobs=1 ) except ImportError: @@ -135,15 +135,13 @@ class TestStatistics(RewardSpaceTestBase): for metric_name, value in metrics.items(): if "pnl" in metric_name: if any( - ( - suffix in metric_name - for suffix in [ - "js_distance", - "ks_statistic", - "wasserstein", - "kl_divergence", - ] - ) + suffix in metric_name + for suffix in [ + "js_distance", + "ks_statistic", + "wasserstein", + "kl_divergence", + ] ): self.assertDistanceMetric(value, name=metric_name) else: @@ -180,7 +178,7 @@ class TestStatistics(RewardSpaceTestBase): "Idle duration and reward arrays should have same length", ) self.assertTrue( - all((d >= 0 for d in idle_dur)), "Idle durations should be non-negative" + all(d >= 0 for d in idle_dur), "Idle durations should be non-negative" ) negative_rewards = (idle_rew < 0).sum() total_rewards = len(idle_rew) @@ -231,7 +229,7 @@ class TestStatistics(RewardSpaceTestBase): diagnostics = distribution_diagnostics(df) expected_prefixes = ["reward_", "pnl_"] for prefix in expected_prefixes: - matching_keys = [key for key in diagnostics.keys() if key.startswith(prefix)] + matching_keys = [key for key in diagnostics if key.startswith(prefix)] self.assertGreater(len(matching_keys), 0, f"Should have diagnostics for {prefix}") expected_suffixes = ["mean", "std", "skewness", "kurtosis"] for suffix in expected_suffixes: @@ -509,7 +507,7 @@ class TestStatistics(RewardSpaceTestBase): df, adjust_method="benjamini_hochberg", seed=SEEDS.REPRODUCIBILITY ) self.assertGreater(len(results_adj), 0) - for name, res in results_adj.items(): + for _name, res in results_adj.items(): self.assertIn("p_value", res) self.assertIn("p_value_adj", res) self.assertIn("significant_adj", res) @@ -542,8 +540,8 @@ class TestStatistics(RewardSpaceTestBase): large = self._shift_scale_df(SCENARIOS.SAMPLE_SIZE_LARGE) res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400) res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400) - _, lo_s, hi_s = list(res_small.values())[0] - _, lo_l, hi_l = list(res_large.values())[0] + _, lo_s, hi_s = next(iter(res_small.values())) + _, lo_l, hi_l = next(iter(res_large.values())) hw_small = (hi_s - lo_s) / 2.0 hw_large = (hi_l - lo_l) / 2.0 self.assertFinite(hw_small, name="hw_small") diff --git a/ReforceXY/reward_space_analysis/tests/test_base.py b/ReforceXY/reward_space_analysis/tests/test_base.py index 0867b08..86f3fd0 100644 --- a/ReforceXY/reward_space_analysis/tests/test_base.py +++ b/ReforceXY/reward_space_analysis/tests/test_base.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 """Base class and utilities for reward space analysis tests.""" +import itertools import math import random import shutil import tempfile import unittest +from collections.abc import Iterable, Sequence from pathlib import Path -from typing import Any, Dict, Iterable, Optional, Sequence, Union +from typing import Any import numpy as np import pandas as pd @@ -61,7 +63,7 @@ PBRS_INTEGRATION_PARAMS = [ "entry_additive_enabled", "exit_additive_enabled", ] -PBRS_REQUIRED_PARAMS = PBRS_INTEGRATION_PARAMS + ["exit_potential_mode"] +PBRS_REQUIRED_PARAMS = [*PBRS_INTEGRATION_PARAMS, "exit_potential_mode"] class RewardSpaceTestBase(unittest.TestCase): @@ -105,9 +107,9 @@ class RewardSpaceTestBase(unittest.TestCase): action=action, ) - def base_params(self, **overrides) -> Dict[str, Any]: + def base_params(self, **overrides) -> dict[str, Any]: """Return fresh copy of default reward params with overrides.""" - params: Dict[str, Any] = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params: dict[str, Any] = DEFAULT_MODEL_REWARD_PARAMETERS.copy() params.update(overrides) return params @@ -115,8 +117,8 @@ class RewardSpaceTestBase(unittest.TestCase): self, params: dict, *, - iterations: Optional[int] = None, - terminal_prob: Optional[float] = None, + iterations: int | None = None, + terminal_prob: float | None = None, seed: int = SEEDS.CANONICAL_SWEEP, ) -> tuple[list[float], list[float]]: """Run a lightweight canonical invariance sweep. @@ -171,10 +173,10 @@ class RewardSpaceTestBase(unittest.TestCase): reward_mean: float = 0.0, reward_std: float = 1.0, pnl_mean: float = 0.01, - pnl_std: Optional[float] = None, + pnl_std: float | None = None, trade_duration_dist: str = "uniform", idle_pattern: str = "mixed", - seed: Optional[int] = None, + seed: int | None = None, ) -> pd.DataFrame: """Generate a synthetic statistical DataFrame. @@ -235,11 +237,11 @@ class RewardSpaceTestBase(unittest.TestCase): def assertAlmostEqualFloat( self, - first: Union[float, int], - second: Union[float, int], - tolerance: Optional[float] = None, - rtol: Optional[float] = None, - msg: Union[str, None] = None, + first: float | int, + second: float | int, + tolerance: float | None = None, + rtol: float | None = None, + msg: str | None = None, ) -> None: """Compare floats with absolute and optional relative tolerance. @@ -264,14 +266,14 @@ class RewardSpaceTestBase(unittest.TestCase): or f"Difference {diff} exceeds tolerance {tolerance} and relative tolerance {rtol} (a={first}, b={second})" ) - def assertPValue(self, value: Union[float, int], msg: str = "") -> None: + def assertPValue(self, value: float | int, msg: str = "") -> None: """Assert a p-value is finite and within [0,1].""" self.assertFinite(value, name="p-value") self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}") self.assertLessEqual(value, 1.0, msg or f"p-value > 1: {value}") def assertPlacesEqual( - self, a: Union[float, int], b: Union[float, int], places: int, msg: Optional[str] = None + self, a: float | int, b: float | int, places: int, msg: str | None = None ) -> None: """Bridge for legacy places-based approximate equality. @@ -283,10 +285,10 @@ class RewardSpaceTestBase(unittest.TestCase): def assertDistanceMetric( self, - value: Union[float, int], + value: float | int, *, non_negative: bool = True, - upper: Optional[float] = None, + upper: float | None = None, name: str = "metric", ) -> None: """Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound.""" @@ -298,7 +300,7 @@ class RewardSpaceTestBase(unittest.TestCase): def assertEffectSize( self, - value: Union[float, int], + value: float | int, *, lower: float = -1.0, upper: float = 1.0, @@ -309,17 +311,17 @@ class RewardSpaceTestBase(unittest.TestCase): self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}") self.assertLessEqual(value, upper, f"{name} > {upper}: {value}") - def assertFinite(self, value: Union[float, int], name: str = "value") -> None: + def assertFinite(self, value: float | int, name: str = "value") -> None: """Assert scalar is finite.""" if not np.isfinite(value): self.fail(f"{name} not finite: {value}") def assertMonotonic( self, - seq: Union[Sequence[Union[float, int]], Iterable[Union[float, int]]], + seq: Sequence[float | int] | Iterable[float | int], *, - non_increasing: Optional[bool] = None, - non_decreasing: Optional[bool] = None, + non_increasing: bool | None = None, + non_decreasing: bool | None = None, tolerance: float = 0.0, name: str = "sequence", ) -> None: @@ -331,21 +333,20 @@ class RewardSpaceTestBase(unittest.TestCase): data = list(seq) if len(data) < 2: return - if non_increasing and non_decreasing or (not non_increasing and (not non_decreasing)): + if (non_increasing and non_decreasing) or (not non_increasing and (not non_decreasing)): self.fail("Specify exactly one monotonic direction") - for a, b in zip(data, data[1:]): + for a, b in itertools.pairwise(data): if non_increasing: if b > a + tolerance: self.fail(f"{name} not non-increasing at pair ({a}, {b})") - elif non_decreasing: - if b + tolerance < a: - self.fail(f"{name} not non-decreasing at pair ({a}, {b})") + elif non_decreasing and b + tolerance < a: + self.fail(f"{name} not non-decreasing at pair ({a}, {b})") def assertWithin( self, - value: Union[float, int], - low: Union[float, int], - high: Union[float, int], + value: float | int, + low: float | int, + high: float | int, *, name: str = "value", inclusive: bool = True, @@ -360,7 +361,7 @@ class RewardSpaceTestBase(unittest.TestCase): self.assertLess(value, high, f"{name} >= {high}") def assertNearZero( - self, value: Union[float, int], *, atol: Optional[float] = None, msg: Optional[str] = None + self, value: float | int, *, atol: float | None = None, msg: str | None = None ) -> None: """Assert a scalar is numerically near zero within absolute tolerance. @@ -377,9 +378,9 @@ class RewardSpaceTestBase(unittest.TestCase): a, b, *, - atol: Optional[float] = None, - rtol: Optional[float] = None, - msg: Optional[str] = None, + atol: float | None = None, + rtol: float | None = None, + msg: str | None = None, ) -> None: """Assert function(func, a, b) == function(func, b, a) within tolerance. diff --git a/ReforceXY/reward_space_analysis/tests/transforms/test_transforms.py b/ReforceXY/reward_space_analysis/tests/transforms/test_transforms.py index 4f8fc09..3004241 100644 --- a/ReforceXY/reward_space_analysis/tests/transforms/test_transforms.py +++ b/ReforceXY/reward_space_analysis/tests/transforms/test_transforms.py @@ -5,6 +5,7 @@ reducing duplication while maintaining full functional coverage for mathematical """ import math +from typing import ClassVar import pytest @@ -20,8 +21,8 @@ class TestTransforms(RewardSpaceTestBase): """Comprehensive transform function tests with parameterized scenarios.""" # Transform function test data - SMOOTH_TRANSFORMS = [t for t in ALLOWED_TRANSFORMS if t != "clip"] - ALL_TRANSFORMS = list(ALLOWED_TRANSFORMS) + SMOOTH_TRANSFORMS: ClassVar[list[str]] = [t for t in ALLOWED_TRANSFORMS if t != "clip"] + ALL_TRANSFORMS: ClassVar[list[str]] = list(ALLOWED_TRANSFORMS) def test_transform_exact_values(self): """Test transform functions produce exact expected values for specific inputs.""" @@ -34,14 +35,14 @@ class TestTransforms(RewardSpaceTestBase): ("asinh", [0.0], [0.0]), # More complex calculations tested separately # arctan transform: (2/pi) * arctan(x) in (-1, 1) ("arctan", [0.0, 1.0], [0.0, 2.0 / math.pi * math.atan(1.0)]), - # sigmoid transform: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1) + # sigmoid transform: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1) # noqa: RUF003 ("sigmoid", [0.0], [0.0]), # More complex calculations tested separately # clip transform: clip(x, -1, 1) in [-1, 1] ("clip", [0.0, 0.5, 2.0, -2.0], [0.0, 0.5, 1.0, -1.0]), ] for transform_name, test_values, expected_values in test_cases: - for test_val, expected_value in zip(test_values, expected_values): + for test_val, expected_value in zip(test_values, expected_values, strict=False): with self.subTest( transform=transform_name, input=test_val, expected=expected_value ): diff --git a/ReforceXY/reward_space_analysis/uv.lock b/ReforceXY/reward_space_analysis/uv.lock index fbd2d3c..505c04f 100644 --- a/ReforceXY/reward_space_analysis/uv.lock +++ b/ReforceXY/reward_space_analysis/uv.lock @@ -343,12 +343,11 @@ source = { editable = "." } dependencies = [ { name = "numpy" }, { name = "pandas" }, - { name = "pytest" }, { name = "scikit-learn" }, { name = "scipy" }, ] -[package.dev-dependencies] +[package.optional-dependencies] dev = [ { name = "pytest" }, { name = "pytest-cov" }, @@ -359,17 +358,13 @@ dev = [ requires-dist = [ { name = "numpy", specifier = ">=1.26" }, { name = "pandas" }, - { name = "pytest" }, + { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, + { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=7.0" }, + { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8" }, { name = "scikit-learn" }, { name = "scipy", specifier = ">=1.11" }, ] - -[package.metadata.requires-dev] -dev = [ - { name = "pytest", specifier = ">=6.0" }, - { name = "pytest-cov", specifier = ">=7.0.0" }, - { name = "ruff" }, -] +provides-extras = ["dev"] [[package]] name = "ruff" diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 83c3ea6..8974741 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -2033,9 +2033,7 @@ class MyRLEnv(Base5ActionRLEnv): if require_position and position not in (Positions.Long, Positions.Short): return 0.0 - duration_ratio = 0.0 if duration_ratio < 0.0 else duration_ratio - if duration_ratio > 1.0: - duration_ratio = 1.0 + duration_ratio = max(0.0, duration_ratio) try: pnl_ratio = pnl / pnl_target @@ -2295,7 +2293,7 @@ class MyRLEnv(Base5ActionRLEnv): **State Variables:** r_pnl : pnl / pnl_target (PnL ratio) - r_dur : duration / max_duration (duration ratio, clamp [0,1]) + r_dur : duration / max_duration (duration ratio, max 0) scale : scale parameter g : gain parameter T_x : transform function (tanh, softsign, etc.)