]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
fix(ReforceXY): remove PBRS reward duration ratio clamping
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 26 Dec 2025 20:37:58 +0000 (21:37 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 26 Dec 2025 20:37:58 +0000 (21:37 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
21 files changed:
ReforceXY/reward_space_analysis/README.md
ReforceXY/reward_space_analysis/pyproject.toml
ReforceXY/reward_space_analysis/reward_space_analysis.py
ReforceXY/reward_space_analysis/test_reward_space_analysis_cli.py
ReforceXY/reward_space_analysis/tests/api/test_api_helpers.py
ReforceXY/reward_space_analysis/tests/cli/test_cli_params_and_csv.py
ReforceXY/reward_space_analysis/tests/components/test_reward_components.py
ReforceXY/reward_space_analysis/tests/constants.py
ReforceXY/reward_space_analysis/tests/helpers/__init__.py
ReforceXY/reward_space_analysis/tests/helpers/assertions.py
ReforceXY/reward_space_analysis/tests/helpers/configs.py
ReforceXY/reward_space_analysis/tests/helpers/warnings.py
ReforceXY/reward_space_analysis/tests/integration/test_integration.py
ReforceXY/reward_space_analysis/tests/pbrs/test_pbrs.py
ReforceXY/reward_space_analysis/tests/robustness/test_robustness.py
ReforceXY/reward_space_analysis/tests/statistics/test_feature_analysis_failures.py
ReforceXY/reward_space_analysis/tests/statistics/test_statistics.py
ReforceXY/reward_space_analysis/tests/test_base.py
ReforceXY/reward_space_analysis/tests/transforms/test_transforms.py
ReforceXY/reward_space_analysis/uv.lock
ReforceXY/user_data/freqaimodels/ReforceXY.py

index b830dfca813b4b5c9dfcbd8f559c68e9d44b7f9b..95c53eefdf07f0fa3b326b593f1c1d5200e509f4 100644 (file)
@@ -351,7 +351,7 @@ multiplier for loss-side holds:
 where:
 
 - `r_pnl = pnl / pnl_target`
-- `r_dur = clamp(duration_ratio, 0, 1)`
+- `r_dur = max(duration_ratio, 0)`
 - `scale = base_factor · hold_potential_ratio`
 - `g = hold_potential_gain`
 - `T_pnl`, `T_dur` = configured transforms
index c99aa8f36b765850926fc12444636ae48e2075f1..7f77c7fbef642b05507064f06f803cafef8495e5 100644 (file)
@@ -12,14 +12,13 @@ dependencies = [
     "pandas",
     "scikit-learn",
     "scipy>=1.11",
-    "pytest",
 ]
 
-[dependency-groups]
+[project.optional-dependencies]
 dev = [
-    "pytest>=6.0",
-    "ruff",
-    "pytest-cov>=7.0.0",
+    "pytest>=8.0",
+    "pytest-cov>=7.0",
+    "ruff>=0.8",
 ]
 
 [build-system]
@@ -59,22 +58,37 @@ log_cli_level = "INFO"
 log_cli_format = "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
 log_cli_date_format = "%Y-%m-%d %H:%M:%S"
 
-# Coverage configuration
 addopts = [
     "--verbose",
     "--tb=short",
     "--strict-markers",
     "--color=yes",
-    "--cov=reward_space_analysis",
-    "--cov-config=pyproject.toml",
-    "--cov-fail-under=85"
+    "--cov",
 ]
 
 [tool.coverage.run]
 source = ["reward_space_analysis"]
+branch = true
+parallel = true
+relative_files = true
 omit = [
-    "tests/*",
-    "test_*.py",
+    "*/tests/*",
+    "**/test_*.py",
+    "**/__pycache__/*",
+]
+
+[tool.coverage.report]
+show_missing = true
+skip_empty = true
+fail_under = 85
+exclude_lines = [
+    "pragma: no cover",
+    "def __repr__",
+    "raise AssertionError",
+    "raise NotImplementedError",
+    "if TYPE_CHECKING:",
+    "if __name__ == .__main__.:",
+    "@abstractmethod",
 ]
 
 [tool.ruff]
@@ -82,5 +96,26 @@ line-length = 100
 target-version = "py311"
 
 [tool.ruff.lint]
-select = ["E", "F", "W", "I"]
-ignore = ["E501"]
+select = [
+    "E",      # pycodestyle errors
+    "W",      # pycodestyle warnings
+    "F",      # pyflakes
+    "I",      # isort
+    "B",      # flake8-bugbear
+    "C4",     # flake8-comprehensions
+    "UP",     # pyupgrade
+    "SIM",    # flake8-simplify
+    "TCH",    # flake8-type-checking
+    "PTH",    # flake8-use-pathlib
+    "RUF",    # ruff-specific rules
+]
+ignore = [
+    "E501",   # line too long
+]
+
+[tool.ruff.lint.isort]
+known-first-party = ["reward_space_analysis"]
+
+[tool.ruff.format]
+quote-style = "double"
+indent-style = "space"
index dd476a962aaa9b6095d1854eaefc9298d58d6649..3cc3b113974e22256f36a61879250b43e9dbb00d 100644 (file)
@@ -18,7 +18,7 @@ import random
 import warnings
 from enum import Enum, IntEnum
 from pathlib import Path
-from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Literal
 
 import numpy as np
 import pandas as pd
@@ -26,6 +26,9 @@ from scipy import stats
 from scipy.spatial.distance import jensenshannon
 from scipy.stats import entropy, probplot
 
+if TYPE_CHECKING:
+    from collections.abc import Iterable
+
 try:
     from sklearn.ensemble import RandomForestRegressor
     from sklearn.inspection import partial_dependence, permutation_importance
@@ -73,15 +76,15 @@ DEFAULT_IDLE_DURATION_MULTIPLIER = 4
 # When that diagnostic column is not available (e.g., reporting from partial datasets),
 # we fall back to the weaker heuristic |Σ shaping| < PBRS_INVARIANCE_TOL.
 PBRS_INVARIANCE_TOL: float = 1e-6
-# Default discount factor γ for potential-based reward shaping
+# Default discount factor γ for potential-based reward shaping  # noqa: RUF003
 POTENTIAL_GAMMA_DEFAULT: float = 0.95
 
 # Default risk/reward ratio (RR)
 RISK_REWARD_RATIO_DEFAULT: float = 2.0
 
 # Supported attenuation modes
-ATTENUATION_MODES: Tuple[str, ...] = ("sqrt", "linear", "power", "half_life")
-ATTENUATION_MODES_WITH_LEGACY: Tuple[str, ...] = ("legacy",) + ATTENUATION_MODES
+ATTENUATION_MODES: tuple[str, ...] = ("sqrt", "linear", "power", "half_life")
+ATTENUATION_MODES_WITH_LEGACY: tuple[str, ...] = ("legacy", *ATTENUATION_MODES)
 
 # Internal numeric guards and behavior toggles
 INTERNAL_GUARDS: dict[str, float] = {
@@ -116,10 +119,10 @@ ALLOWED_EXIT_POTENTIAL_MODES = {
 }
 
 # Supported trading modes
-TRADING_MODES: Tuple[str, ...] = ("spot", "margin", "futures")
+TRADING_MODES: tuple[str, ...] = ("spot", "margin", "futures")
 
 # Supported p-value adjustment methods
-ADJUST_METHODS: Tuple[str, ...] = ("none", "benjamini_hochberg")
+ADJUST_METHODS: tuple[str, ...] = ("none", "benjamini_hochberg")
 # Alias without underscore for convenience
 _ADJUST_METHODS_ALIASES: frozenset[str] = frozenset({"benjaminihochberg"})
 
@@ -154,7 +157,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
     "exit_factor_threshold": 1000.0,
     # === PBRS PARAMETERS ===
     # Potential-based reward shaping core parameters
-    # Discount factor γ for potential term (0 ≤ γ ≤ 1)
+    # Discount factor γ for potential term (0 ≤ γ ≤ 1)  # noqa: RUF003
     "potential_gamma": POTENTIAL_GAMMA_DEFAULT,
     # Exit potential modes: canonical | non_canonical | progressive_release | spike_cancel | retain_previous
     "exit_potential_mode": "canonical",
@@ -181,7 +184,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = {
     "exit_additive_transform_duration": "tanh",
 }
 
-DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
+DEFAULT_MODEL_REWARD_PARAMETERS_HELP: dict[str, str] = {
     "invalid_action": "Penalty for invalid actions",
     "base_factor": "Base reward scale",
     "idle_penalty_power": "Idle penalty exponent",
@@ -203,9 +206,9 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
     "check_invariants": "Enable runtime invariant checks",
     "exit_factor_threshold": "Warn if |exit_factor| exceeds",
     # PBRS parameters
-    "potential_gamma": "PBRS discount γ (0–1)",
+    "potential_gamma": "PBRS discount γ (0-1)",  # noqa: RUF001
     "exit_potential_mode": "Exit potential mode (canonical|non_canonical|progressive_release|spike_cancel|retain_previous)",
-    "exit_potential_decay": "Decay for progressive_release (01)",
+    "exit_potential_decay": "Decay for progressive_release (0-1)",
     "hold_potential_enabled": "Enable hold potential Φ",
     "hold_potential_ratio": "Hold potential ratio",
     "hold_potential_gain": "Hold potential gain",
@@ -230,7 +233,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
 # Parameter validation utilities
 # ---------------------------------------------------------------------------
 
-_PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = {
+_PARAMETER_BOUNDS: dict[str, dict[str, float]] = {
     # key: {min: ..., max: ...}  (bounds are inclusive where it makes sense)
     "invalid_action": {"max": 0.0},  # penalty should be <= 0
     "base_factor": {"min": 0.0},
@@ -261,8 +264,8 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = {
     "exit_additive_gain": {"min": 0.0},
 }
 
-RewardParamValue = Union[float, str, bool, None]
-RewardParams = Dict[str, RewardParamValue]
+RewardParamValue = float | str | bool | None
+RewardParams = dict[str, RewardParamValue]
 
 
 class RewardDiagnosticsWarning(RuntimeWarning):
@@ -316,7 +319,7 @@ def _to_bool(value: Any) -> bool:
     raise ValueError(f"Param: unrecognized boolean literal {value!r}")
 
 
-def _get_bool_param(params: RewardParams, key: str, default: Optional[bool] = None) -> bool:
+def _get_bool_param(params: RewardParams, key: str, default: bool | None = None) -> bool:
     """Extract boolean parameter with type safety.
 
     Args:
@@ -363,7 +366,7 @@ def _resolve_additive_enablement(
 
 
 def _get_float_param(
-    params: RewardParams, key: str, default: Optional[RewardParamValue] = None
+    params: RewardParams, key: str, default: RewardParamValue | None = None
 ) -> float:
     """Extract float parameter with type safety and default fallback.
 
@@ -409,7 +412,7 @@ def _clamp_float_to_bounds(
     key: str,
     value: float,
     *,
-    bounds: Optional[Dict[str, float]] = None,
+    bounds: dict[str, float] | None = None,
     strict: bool,
 ) -> tuple[float, list[str]]:
     """Clamp numeric `value` to bounds for `key`.
@@ -452,9 +455,7 @@ def _clamp_float_to_bounds(
     return adjusted, reason_parts
 
 
-def _get_int_param(
-    params: RewardParams, key: str, default: Optional[RewardParamValue] = None
-) -> int:
+def _get_int_param(params: RewardParams, key: str, default: RewardParamValue | None = None) -> int:
     """Extract integer parameter with robust coercion.
 
     Args:
@@ -502,7 +503,7 @@ def _get_int_param(
     return int(default) if isinstance(default, (int, float)) else 0
 
 
-def _get_str_param(params: RewardParams, key: str, default: Optional[str] = None) -> str:
+def _get_str_param(params: RewardParams, key: str, default: str | None = None) -> str:
     """Extract string parameter with type safety and default fallback.
 
     Args:
@@ -547,7 +548,7 @@ def _fail_safely(reason: str) -> float:
 def get_max_idle_duration_candles(
     params: RewardParams,
     *,
-    max_trade_duration_candles: Optional[int] = None,
+    max_trade_duration_candles: int | None = None,
 ) -> int:
     mtd = (
         int(max_trade_duration_candles)
@@ -569,7 +570,7 @@ def get_max_idle_duration_candles(
 def validate_reward_parameters(
     params: RewardParams,
     strict: bool = True,
-) -> Tuple[RewardParams, Dict[str, Dict[str, Any]]]:
+) -> tuple[RewardParams, dict[str, dict[str, Any]]]:
     """Clamp parameters to bounds and coerce booleans and numeric overrides.
 
     Returns a sanitized copy plus adjustments mapping (param -> original/adjusted/reason).
@@ -578,10 +579,10 @@ def validate_reward_parameters(
     - Numeric-bounded keys are coerced to float when provided as str/bool/None.
       * In strict mode: raise on non-numeric or out-of-bounds.
       * In relaxed mode: fallback to min bound or 0.0 with adjustment reason.
-    - Nonfinite numerics fall back to min bound or 0.0 (relaxed) or raise (strict).
+    - Non-finite numerics fall back to min bound or 0.0 (relaxed) or raise (strict).
     """
     sanitized = dict(params)
-    adjustments: Dict[str, Dict[str, Any]] = {}
+    adjustments: dict[str, dict[str, Any]] = {}
 
     # Boolean parameter coercion
     _bool_keys = [
@@ -665,7 +666,7 @@ def validate_reward_parameters(
         if not np.isclose(adjusted, original_numeric):
             sanitized[key] = adjusted
             prev_reason = adjustments.get(key, {}).get("reason")
-            reason: List[str] = []
+            reason: list[str] = []
             if prev_reason:
                 reason.append(prev_reason)
             reason.extend(reason_parts)
@@ -781,7 +782,7 @@ class RewardBreakdown:
     next_potential: float = 0.0
     # PBRS helpers
     base_reward: float = 0.0
-    pbrs_delta: float = 0.0  # Δ(s,a,s') = γ·Φ(s') − Φ(s)
+    pbrs_delta: float = 0.0  # Δ(s,a,s') = γ·Φ(s') − Φ(s)  # noqa: RUF003
     invariance_correction: float = 0.0
 
 
@@ -876,7 +877,7 @@ def _compute_time_attenuation_coefficient(
     else:
         effective_dr = duration_ratio
 
-    kernel = kernels.get(exit_attenuation_mode, None)
+    kernel = kernels.get(exit_attenuation_mode)
     if kernel is None:
         _warn_unknown_mode(
             "exit_attenuation_mode",
@@ -912,12 +913,12 @@ def _get_exit_factor(
     """
     Compute exit reward factor by applying multiplicative coefficients to base_factor.
 
-    Formula: exit_factor = base_factor × time_attenuation_coefficient × pnl_target_coefficient × efficiency_coefficient
+    Formula: exit_factor = base_factor * time_attenuation_coefficient * pnl_target_coefficient * efficiency_coefficient
 
     Args:
         base_factor: Base reward value before coefficient adjustments
         pnl: Realized profit/loss
-        pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+        pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
         duration_ratio: Trade duration relative to target duration
         context: Trade context with unrealized profit/loss extremes
         params: Reward configuration parameters
@@ -955,7 +956,7 @@ def _get_exit_factor(
         if exit_factor < 0.0 and pnl >= 0.0:
             exit_factor = 0.0
         exit_factor_threshold = _get_float_param(params, "exit_factor_threshold")
-        if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):
+        if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):  # noqa: SIM102
             if abs(exit_factor) > exit_factor_threshold:
                 warnings.warn(
                     f"|exit_factor|={abs(exit_factor):.2f} > threshold={exit_factor_threshold:.2f}",
@@ -982,7 +983,7 @@ def _compute_pnl_target_coefficient(
     Args:
         params: Reward configuration parameters
         pnl: Realized profit/loss
-        pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+        pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
         risk_reward_ratio: Risk/reward ratio for loss penalty calculation
 
     Returns:
@@ -1134,14 +1135,14 @@ def _compute_exit_reward(
 
     Args:
         base_factor: Base reward value before coefficient adjustments
-        pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+        pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
         duration_ratio: Trade duration relative to target duration
         context: Trade context with PnL and unrealized profit/loss extremes
         params: Reward configuration parameters
         risk_reward_ratio: Risk/reward ratio (must match the value used to calculate pnl_target)
 
     Returns:
-        float: Exit reward (pnl × exit_factor)
+        float: Exit reward (pnl * exit_factor)
     """
     exit_factor = _get_exit_factor(
         base_factor, context.pnl, pnl_target, duration_ratio, context, params, risk_reward_ratio
@@ -1168,7 +1169,7 @@ def calculate_reward(
         short_allowed=short_allowed,
     )
 
-    base_reward: Optional[float] = None
+    base_reward: float | None = None
     if not is_valid and not action_masking:
         breakdown.invalid_penalty = _get_float_param(params, "invalid_action")
         base_reward = breakdown.invalid_penalty
@@ -1516,7 +1517,7 @@ def simulate_samples(
     )
     max_trade_duration_cap = int(max_trade_duration_candles * max_duration_ratio)
 
-    samples: list[Dict[str, float]] = []
+    samples: list[dict[str, float]] = []
     prev_potential: float = 0.0
 
     # Stateful trajectory variables
@@ -1763,7 +1764,7 @@ def _validate_simulation_invariants(df: pd.DataFrame) -> None:
         )
 
 
-def _compute_summary_stats(df: pd.DataFrame) -> Dict[str, Any]:
+def _compute_summary_stats(df: pd.DataFrame) -> dict[str, Any]:
     """Compute summary statistics without writing to file."""
     action_summary = df.groupby("action")["reward"].agg(["count", "mean", "std", "min", "max"])
     component_share = df[
@@ -1835,7 +1836,7 @@ def _binned_stats(
     return aggregated
 
 
-def _compute_relationship_stats(df: pd.DataFrame) -> Dict[str, Any]:
+def _compute_relationship_stats(df: pd.DataFrame) -> dict[str, Any]:
     """Return binned stats dict for idle, trade duration and pnl (uniform bins).
 
     Defensive against missing optional columns (e.g., reward_invalid when synthetic
@@ -1897,7 +1898,7 @@ def _compute_representativity_stats(
     df: pd.DataFrame,
     profit_aim: float,
     risk_reward_ratio: float,
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
     """Compute representativity statistics for the reward space."""
     pnl_target = float(profit_aim * risk_reward_ratio)
     total = len(df)
@@ -1942,7 +1943,7 @@ def _perform_feature_analysis(
     skip_partial_dependence: bool = False,
     rf_n_jobs: int = 1,
     perm_n_jobs: int = 1,
-) -> Tuple[pd.DataFrame, Dict[str, Any], Dict[str, pd.DataFrame], Optional[RandomForestRegressor]]:
+) -> tuple[pd.DataFrame, dict[str, Any], dict[str, pd.DataFrame], RandomForestRegressor | None]:
     """Compute feature importances using RandomForestRegressor.
 
     Parameters
@@ -2064,7 +2065,7 @@ def _perform_feature_analysis(
             n_test=0,
         )
 
-    model: Optional[RandomForestRegressor] = RandomForestRegressor(
+    model: RandomForestRegressor | None = RandomForestRegressor(
         n_estimators=400,
         max_depth=None,
         random_state=seed,
@@ -2119,7 +2120,7 @@ def _perform_feature_analysis(
         )
 
     # Partial dependence (optional)
-    partial_deps: Dict[str, pd.DataFrame] = {}
+    partial_deps: dict[str, pd.DataFrame] = {}
     if model is not None and not skip_partial_dependence:
         for feature in [
             f for f in ["trade_duration", "idle_duration", "pnl"] if f in X_test.columns
@@ -2192,10 +2193,10 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr
         else:
             try:
                 df = pd.DataFrame(list(candidate))
-            except TypeError:
+            except TypeError as e:
                 raise ValueError(
                     f"Data: 'transitions' in '{path}' is not iterable (type {type(candidate)!r})"
-                )
+                ) from e
             except Exception as e:
                 raise ValueError(
                     f"Data: could not build DataFrame from 'transitions' in '{path}': {e!r}"
@@ -2214,10 +2215,10 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr
                 else:
                     try:
                         all_transitions.extend(list(trans))
-                    except TypeError:
+                    except TypeError as e:
                         raise ValueError(
                             f"Data: episode 'transitions' is not iterable in '{path}' (type {type(trans)!r})"
-                        )
+                        ) from e
             else:
                 skipped += 1
         if skipped:
@@ -2298,7 +2299,7 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr
         if enforce_columns:
             raise ValueError(
                 f"Data: missing required columns {sorted(missing_required)}. "
-                f"Found: {sorted(list(df.columns))}"
+                f"Found: {sorted(df.columns)}"
             )
         warnings.warn(
             f"Missing columns {sorted(missing_required)}; filled with NaN when loading (enforce_columns=False)",
@@ -2329,7 +2330,7 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr
 def compute_distribution_shift_metrics(
     synthetic_df: pd.DataFrame,
     real_df: pd.DataFrame,
-) -> Dict[str, float]:
+) -> dict[str, float]:
     """Compute distribution shift metrics between synthetic and real samples.
 
     Returns KL divergence, JS distance, Wasserstein distance, and KS test
@@ -2395,7 +2396,7 @@ def compute_distribution_shift_metrics(
     return metrics
 
 
-def _validate_distribution_metrics(metrics: Dict[str, float]) -> None:
+def _validate_distribution_metrics(metrics: dict[str, float]) -> None:
     """Validate mathematical bounds of distribution shift metrics."""
     for key, value in metrics.items():
         if not np.isfinite(value):
@@ -2406,28 +2407,25 @@ def _validate_distribution_metrics(metrics: Dict[str, float]) -> None:
             raise AssertionError(f"KL divergence {key} must be >= 0, got {value:.6f}")
 
         # JS distance must be in [0, 1]
-        if "js_distance" in key:
-            if not (0 <= value <= 1):
-                raise AssertionError(f"JS distance {key} must be in [0,1], got {value:.6f}")
+        if "js_distance" in key and not (0 <= value <= 1):
+            raise AssertionError(f"JS distance {key} must be in [0,1], got {value:.6f}")
 
         # Wasserstein distance must be >= 0
         if "wasserstein" in key and value < 0:
             raise AssertionError(f"Wasserstein distance {key} must be >= 0, got {value:.6f}")
 
         # KS statistic must be in [0, 1]
-        if "ks_statistic" in key:
-            if not (0 <= value <= 1):
-                raise AssertionError(f"KS statistic {key} must be in [0,1], got {value:.6f}")
+        if "ks_statistic" in key and not (0 <= value <= 1):
+            raise AssertionError(f"KS statistic {key} must be in [0,1], got {value:.6f}")
 
         # p-values must be in [0, 1]
-        if "pvalue" in key:
-            if not (0 <= value <= 1):
-                raise AssertionError(f"p-value {key} must be in [0,1], got {value:.6f}")
+        if "pvalue" in key and not (0 <= value <= 1):
+            raise AssertionError(f"p-value {key} must be in [0,1], got {value:.6f}")
 
 
 def statistical_hypothesis_tests(
     df: pd.DataFrame, *, adjust_method: str = ADJUST_METHODS[0], seed: int = 42
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
     """Statistical hypothesis tests (Spearman, Kruskal-Wallis, Mann-Whitney).
 
     Parameters
@@ -2547,7 +2545,7 @@ def statistical_hypothesis_tests(
         adj_final = np.empty_like(adj_sorted)
         adj_final[order] = np.clip(adj_sorted, 0, 1)
         # Attach adjusted p-values and recompute significance
-        for (name, res), p_adj in zip(items, adj_final):
+        for (name, res), p_adj in zip(items, adj_final, strict=False):
             res["p_value_adj"] = float(p_adj)
             res["significant_adj"] = bool(p_adj < alpha)
             results[name] = res
@@ -2558,7 +2556,7 @@ def statistical_hypothesis_tests(
     return results
 
 
-def _validate_hypothesis_test_results(results: Dict[str, Any]) -> None:
+def _validate_hypothesis_test_results(results: dict[str, Any]) -> None:
     """Validate statistical properties of hypothesis test results."""
     for test_name, result in results.items():
         # All p-values must be in [0, 1] or NaN (for cases like constant input)
@@ -2616,13 +2614,13 @@ def _validate_hypothesis_test_results(results: Dict[str, Any]) -> None:
 
 def bootstrap_confidence_intervals(
     df: pd.DataFrame,
-    metrics: List[str],
+    metrics: list[str],
     n_bootstrap: int = 10000,
     confidence_level: float = 0.95,
     seed: int = 42,
     *,
     strict_diagnostics: bool = False,
-) -> Dict[str, Tuple[float, float, float]]:
+) -> dict[str, tuple[float, float, float]]:
     """Compute bootstrap confidence intervals for metric means.
 
     Returns percentile-based CIs, skipping metrics with <10 samples.
@@ -2639,6 +2637,7 @@ def bootstrap_confidence_intervals(
         warnings.warn(
             f"n_bootstrap={n_bootstrap} < {min_rec}; confidence intervals may be unstable",
             RewardDiagnosticsWarning,
+            stacklevel=2,
         )
 
     # Local RNG to avoid mutating global NumPy RNG state
@@ -2686,7 +2685,7 @@ def bootstrap_confidence_intervals(
 
 
 def _validate_bootstrap_results(
-    results: Dict[str, Tuple[float, float, float]], *, strict_diagnostics: bool
+    results: dict[str, tuple[float, float, float]], *, strict_diagnostics: bool
 ) -> None:
     """Validate each bootstrap CI: finite bounds, ordered, positive width (adjust or raise)."""
     for metric, (mean, ci_low, ci_high) in results.items():
@@ -2710,10 +2709,7 @@ def _validate_bootstrap_results(
             if strict_diagnostics:
                 raise AssertionError(f"Bootstrap CI for {metric}: non-positive width {width:.6f}")
             # Graceful mode: expand interval symmetrically
-            if width == 0:
-                epsilon = INTERNAL_GUARDS["degenerate_ci_epsilon"]
-            else:
-                epsilon = abs(width) * 1e-6
+            epsilon = INTERNAL_GUARDS["degenerate_ci_epsilon"] if width == 0 else abs(width) * 1e-06
             center = mean
             # Adjust only if current bounds are identical; otherwise enforce ordering minimally.
             if ci_low == ci_high:
@@ -2728,6 +2724,7 @@ def _validate_bootstrap_results(
             warnings.warn(
                 f"bootstrap_ci for '{metric}' degenerate (width={width:.6e}); adjusted with epsilon={epsilon:.1e}",
                 RewardDiagnosticsWarning,
+                stacklevel=2,
             )
 
 
@@ -2736,7 +2733,7 @@ def distribution_diagnostics(
     *,
     seed: int | None = None,
     strict_diagnostics: bool = False,
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
     """Return mapping col-> diagnostics (tests, moments, entropy, divergences).
 
     Skips missing columns; selects Shapiro-Wilk when n<=5000 else K2; ignores non-finite intermediates.
@@ -2763,7 +2760,7 @@ def distribution_diagnostics(
             msg = f"Extreme moment(s) for {col}: skew={skew_v:.3e}, kurtosis={kurt_v:.3e} exceeds threshold {thr}."
             if strict_diagnostics:
                 raise AssertionError(msg)
-            warnings.warn(msg, RewardDiagnosticsWarning)
+            warnings.warn(msg, RewardDiagnosticsWarning, stacklevel=2)
 
         if len(data) < 5000:
             sw_stat, sw_pval = stats.shapiro(data)
@@ -2785,7 +2782,7 @@ def distribution_diagnostics(
     return diagnostics
 
 
-def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnostics: bool) -> None:
+def _validate_distribution_diagnostics(diag: dict[str, Any], *, strict_diagnostics: bool) -> None:
     """Validate mathematical properties of distribution diagnostics.
 
     Ensures all reported statistics are finite and within theoretical bounds where applicable.
@@ -2800,7 +2797,7 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti
             zero_var_columns.add(prefix)
 
     for key, value in list(diag.items()):
-        if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]):
+        if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]):  # noqa: SIM102
             if not np.isfinite(value):
                 # Graceful degradation for constant distributions: skewness/kurtosis become NaN.
                 constant_problem = any(
@@ -2814,13 +2811,13 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti
                     warnings.warn(
                         f"{key} undefined (constant distribution); falling back to {fallback}",
                         RewardDiagnosticsWarning,
+                        stacklevel=2,
                     )
                 else:
                     raise AssertionError(f"Distribution diagnostic {key} is not finite: {value}")
-        if key.endswith("_shapiro_pval"):
-            if not (0 <= value <= 1):
-                raise AssertionError(f"Shapiro p-value {key} must be in [0,1], got {value}")
-        if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"):
+        if key.endswith("_shapiro_pval") and not (0 <= value <= 1):
+            raise AssertionError(f"Shapiro p-value {key} must be in [0,1], got {value}")
+        if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"):  # noqa: SIM102
             if not np.isfinite(value):
                 prefix = key.rsplit("_", 2)[0]
                 if prefix in zero_var_columns and not strict_diagnostics:
@@ -2829,10 +2826,11 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti
                     warnings.warn(
                         f"{key} undefined (constant distribution); falling back to {fallback}",
                         RewardDiagnosticsWarning,
+                        stacklevel=2,
                     )
                     continue
                 raise AssertionError(f"Anderson statistic {key} must be finite, got {value}")
-        if key.endswith("_qq_r_squared"):
+        if key.endswith("_qq_r_squared"):  # noqa: SIM102
             if not (isinstance(value, (int, float)) and np.isfinite(value) and 0 <= value <= 1):
                 prefix = key[: -len("_qq_r_squared")]
                 if prefix in zero_var_columns and not strict_diagnostics:
@@ -2841,6 +2839,7 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any], *, strict_diagnosti
                     warnings.warn(
                         f"{key} undefined (constant distribution); falling back to {fallback_r2}",
                         RewardDiagnosticsWarning,
+                        stacklevel=2,
                     )
                 else:
                     raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}")
@@ -2868,7 +2867,7 @@ def _apply_transform_arctan(value: float) -> float:
 
 
 def _apply_transform_sigmoid(value: float) -> float:
-    """sigmoid: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1)."""
+    """sigmoid: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1)."""  # noqa: RUF002
     x = value
     try:
         if x >= 0:
@@ -3196,13 +3195,13 @@ def compute_pbrs_components(
     R'(s,a,s') = R(s,a,s') + Δ(s,a,s')
 
     where:
-        Δ(s,a,s') = γ·Φ(s') - Φ(s)  (PBRS shaping term)
+        Δ(s,a,s') = gamma * Phi(s') - Phi(s)  (PBRS shaping term)
 
     Hold Potential Formula
     ----------------------
     Let:
         r_pnl = pnl / pnl_target
-        r_dur = clamp(duration_ratio, 0, 1)
+        r_dur = max(duration_ratio, 0)
         scale = base_factor · hold_potential_ratio
         g = gain
         T_pnl, T_dur = configured bounded transforms
@@ -3345,7 +3344,7 @@ def _compute_pnl_duration_signal(
     non_finite_key: str,
     *,
     base_factor: float,
-    risk_reward_ratio: Optional[float] = None,
+    risk_reward_ratio: float | None = None,
 ) -> float:
     """Generic helper for (pnl, duration) bi-component transforms."""
     if not (np.isfinite(pnl) and np.isfinite(pnl_target) and np.isfinite(duration_ratio)):
@@ -3354,7 +3353,7 @@ def _compute_pnl_duration_signal(
         return _fail_safely(f"{kind}_invalid_pnl_target")
 
     pnl_ratio = float(pnl / pnl_target)
-    duration_ratio = float(np.clip(duration_ratio, 0.0, 1.0))
+    duration_ratio = float(max(0.0, duration_ratio))
 
     ratio = _get_float_param(params, scale_key)
     scale = ratio * base_factor
@@ -3537,10 +3536,10 @@ def write_complete_statistical_analysis(
     profit_aim: float,
     risk_reward_ratio: float,
     seed: int,
-    real_df: Optional[pd.DataFrame] = None,
+    real_df: pd.DataFrame | None = None,
     *,
     adjust_method: str = ADJUST_METHODS[0],
-    stats_seed: Optional[int] = None,
+    stats_seed: int | None = None,
     strict_diagnostics: bool = False,
     bootstrap_resamples: int = 10000,
     skip_partial_dependence: bool = False,
@@ -3590,7 +3589,7 @@ def write_complete_statistical_analysis(
             sep += "|" + "-" * (len(str(c)) + 2)
         sep += "|\n"
         # Rows
-        rows: List[str] = []
+        rows: list[str] = []
         for idx, row in df.iterrows():
             vals = [_fmt_val(row[c], ndigits) for c in cols]
             rows.append("| " + str(idx) + " | " + " | ".join(vals) + " |")
@@ -3720,7 +3719,7 @@ def write_complete_statistical_analysis(
         # Blank separator before overrides block
         f.write("|  |  |\n")
 
-        overrides_pairs: List[str] = []
+        overrides_pairs: list[str] = []
         if reward_params:
             for k, default_v in DEFAULT_MODEL_REWARD_PARAMETERS.items():
                 if k in ("exit_potential_mode", "potential_gamma"):
@@ -3755,7 +3754,7 @@ def write_complete_statistical_analysis(
         f.write("### 1.3 Component Activation Rates\n\n")
         f.write("Percentage of samples where each reward component is non-zero:\n\n")
         comp_share = summary_stats["component_share"].copy()
-        formatted_rows: List[str] = [
+        formatted_rows: list[str] = [
             "| Component | Activation Rate |",
             "|-----------|----------------|",
         ]
@@ -3864,7 +3863,7 @@ def write_complete_statistical_analysis(
         f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4))
         _dropped = relationship_stats.get("correlation_dropped") or []
         if _dropped:
-            dropped_strs: List[str] = [str(x) for x in _dropped]
+            dropped_strs: list[str] = [str(x) for x in _dropped]
             f.write("\n_Constant features removed: " + ", ".join(dropped_strs) + "._\n\n")
 
         # Section 3.5: PBRS Analysis
@@ -3933,10 +3932,10 @@ def write_complete_statistical_analysis(
                 f.write("|--------|-------|-------------|\n")
                 f.write(f"| Mean Base Reward | {mean_base:.6f} | Average reward before PBRS |\n")
                 f.write(f"| Std Base Reward | {std_base:.6f} | Variability of base reward |\n")
-                f.write(f"| Mean PBRS Delta | {mean_pbrs:.6f} | Average γ·Φ(s')−Φ(s) |\n")
+                f.write(f"| Mean PBRS Delta | {mean_pbrs:.6f} | Average γ·Φ(s')−Φ(s) |\n")  # noqa: RUF001
                 f.write(f"| Std PBRS Delta | {std_pbrs:.6f} | Variability of PBRS delta |\n")
                 f.write(
-                    f"| Mean Invariance Correction | {mean_inv_corr:.6f} | Average reward_shaping − pbrs_delta |\n"
+                    f"| Mean Invariance Correction | {mean_inv_corr:.6f} | Average reward_shaping − pbrs_delta |\n"  # noqa: RUF001
                 )
                 f.write(
                     f"| Std Invariance Correction | {std_inv_corr:.6f} | Variability of correction |\n"
@@ -4093,7 +4092,7 @@ def write_complete_statistical_analysis(
                 # Render as markdown without index column
                 header = "| feature | importance_mean | importance_std |\n"
                 sep = "|---------|------------------|----------------|\n"
-                rows: List[str] = []
+                rows: list[str] = []
                 for _, r in top_imp.iterrows():
                     rows.append(
                         f"| {r['feature']} | {_fmt_val(r['importance_mean'], 6)} | {_fmt_val(r['importance_std'], 6)} |"
@@ -4120,16 +4119,16 @@ def write_complete_statistical_analysis(
                 h = hypothesis_tests["idle_correlation"]
                 f.write("#### 5.1.1 Idle Duration → Idle Penalty Correlation\n\n")
                 f.write(f"**Test Method:** {h['test']}\n\n")
-                f.write(f"- Spearman ρ: **{h['rho']:.4f}**\n")
+                f.write(f"- Spearman ρ: **{h['rho']:.4f}**\n")  # noqa: RUF001
                 f.write(f"- p-value: {h['p_value']:.4g}\n")
                 if "p_value_adj" in h:
                     f.write(
-                        f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
+                        f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"  # noqa: RUF001
                     )
                 f.write(f"- 95% CI: [{h['ci_95'][0]:.4f}, {h['ci_95'][1]:.4f}]\n")
                 f.write(f"- CI width: {(h['ci_95'][1] - h['ci_95'][0]):.4f}\n")
                 f.write(f"- Sample size: {h['n_samples']:,}\n")
-                f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n")
+                f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n")  # noqa: RUF001
                 f.write(f"- **Interpretation:** {h['interpretation']}\n\n")
 
             if "position_reward_difference" in hypothesis_tests:
@@ -4140,11 +4139,11 @@ def write_complete_statistical_analysis(
                 f.write(f"- p-value: {h['p_value']:.4g}\n")
                 if "p_value_adj" in h:
                     f.write(
-                        f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
+                        f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"  # noqa: RUF001
                     )
                 f.write(f"- Effect size (ε²): {h['effect_size_epsilon_sq']:.4f}\n")
                 f.write(f"- Number of groups: {h['n_groups']}\n")
-                f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n")
+                f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n")  # noqa: RUF001
                 f.write(f"- **Interpretation:** {h['interpretation']} effect\n\n")
 
             if "pnl_sign_reward_difference" in hypothesis_tests:
@@ -4155,11 +4154,11 @@ def write_complete_statistical_analysis(
                 f.write(f"- p-value: {h['p_value']:.4g}\n")
                 if "p_value_adj" in h:
                     f.write(
-                        f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"
+                        f"- p-value (adj BH): {h['p_value_adj']:.4g} -> {'✅ Yes' if h['significant_adj'] else '❌ No'} (α=0.05)\n"  # noqa: RUF001
                     )
                 f.write(f"- Median (PnL+): {h['median_pnl_positive']:.4f}\n")
                 f.write(f"- Median (PnL-): {h['median_pnl_negative']:.4f}\n")
-                f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n")
+                f.write(f"- Significant (α=0.05): {'✅ Yes' if h['significant'] else '❌ No'}\n\n")  # noqa: RUF001
 
             # Bootstrap CI
             if bootstrap_ci:
@@ -4408,7 +4407,7 @@ def main() -> None:
         "action_masking",
     ]
 
-    sim_params: Dict[str, Any] = {}
+    sim_params: dict[str, Any] = {}
     for k in candidate_keys:
         if k in args_dict:
             v = args_dict[k]
@@ -4460,12 +4459,12 @@ def main() -> None:
     # Generate manifest summarizing key metrics
     try:
         manifest_path = args.out_dir / "manifest.json"
-        resolved_reward_params: Dict[str, Any] = dict(
+        resolved_reward_params: dict[str, Any] = dict(
             params
         )  # already validated/normalized upstream
-        manifest: Dict[str, Any] = {
+        manifest: dict[str, Any] = {
             "generated_at": pd.Timestamp.now().isoformat(),
-            "num_samples": int(len(df)),
+            "num_samples": len(df),
             "seed": int(args.seed),
             "pnl_target": float(profit_aim * risk_reward_ratio),
             "pvalue_adjust_method": args.pvalue_adjust,
@@ -4475,13 +4474,13 @@ def main() -> None:
         sim_params_dict = df.attrs.get("simulation_params", {})
         if not isinstance(sim_params_dict, dict):
             sim_params_dict = {}
-        sim_params: Dict[str, Any] = dict(sim_params_dict)
+        sim_params: dict[str, Any] = dict(sim_params_dict)
         if sim_params:
             excluded_for_hash = {"out_dir", "real_episodes"}
-            sim_params_for_hash: Dict[str, Any] = {
+            sim_params_for_hash: dict[str, Any] = {
                 k: sim_params[k] for k in sim_params if k not in excluded_for_hash
             }
-            _hash_source: Dict[str, Any] = {
+            _hash_source: dict[str, Any] = {
                 **{f"sim::{k}": sim_params_for_hash[k] for k in sorted(sim_params_for_hash)},
                 **{
                     f"reward::{k}": resolved_reward_params[k]
index b50f2e1414e4e2c131b1e616b583658953912edb..2bd616675fec2799ea8fb380394f6f084146d25b 100644 (file)
@@ -34,6 +34,7 @@ Exit codes
 from __future__ import annotations
 
 import argparse
+import contextlib
 import itertools
 import json
 import math
@@ -47,15 +48,15 @@ import sys
 import tempfile
 import time
 from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple, TypedDict
+from typing import Any, TypedDict
 
 try:
     from typing import NotRequired, Required  # Python >=3.11
 except ImportError:
-    from typing_extensions import NotRequired, Required  # Python <3.11
+    from typing import NotRequired, Required  # Python <3.11
 
 
-ConfigTuple = Tuple[str, str, float, int, int, int]
+ConfigTuple = tuple[str, str, float, int, int, int]
 
 SUMMARY_FILENAME = "reward_space_cli.json"
 
@@ -66,25 +67,25 @@ class ScenarioResult(TypedDict):
     stdout: str
     stderr: str
     strict: bool
-    seconds: Optional[float]
+    seconds: float | None
     warnings: int
 
 
 class SummaryResult(TypedDict, total=False):
     # Required keys
     total: Required[int]
-    successes: Required[List[ScenarioResult]]
-    failures: Required[List[ScenarioResult]]
-    mean_seconds: Required[Optional[float]]
-    max_seconds: Required[Optional[float]]
-    min_seconds: Required[Optional[float]]
-    median_seconds: Required[Optional[float]]
-    p95_seconds: Required[Optional[float]]
+    successes: Required[list[ScenarioResult]]
+    failures: Required[list[ScenarioResult]]
+    mean_seconds: Required[float | None]
+    max_seconds: Required[float | None]
+    min_seconds: Required[float | None]
+    median_seconds: Required[float | None]
+    p95_seconds: Required[float | None]
 
     # Extension keys
-    warnings_breakdown: NotRequired[Dict[str, int]]
-    seeds: NotRequired[Dict[str, Any]]
-    metadata: NotRequired[Dict[str, Any]]
+    warnings_breakdown: NotRequired[dict[str, int]]
+    seeds: NotRequired[dict[str, Any]]
+    metadata: NotRequired[dict[str, Any]]
     interrupted: NotRequired[bool]
 
 
@@ -102,8 +103,8 @@ def _is_warning_header(line: str) -> bool:
 
 def build_arg_matrix(
     max_scenarios: int = 40,
-    shuffle_seed: Optional[int] = None,
-) -> List[ConfigTuple]:
+    shuffle_seed: int | None = None,
+) -> list[ConfigTuple]:
     exit_potential_modes = [
         "canonical",
         "non_canonical",
@@ -126,7 +127,7 @@ def build_arg_matrix(
         exit_additive_enabled,
     )
 
-    full: List[ConfigTuple] = list(product_iter)
+    full: list[ConfigTuple] = list(product_iter)
     full = [c for c in full if not (c[0] == "canonical" and (c[4] == 1 or c[5] == 1))]
     if shuffle_seed is not None:
         rnd = random.Random(shuffle_seed)
@@ -135,10 +136,10 @@ def build_arg_matrix(
         return full
     step = len(full) / max_scenarios
     idx_pos = step / 2.0  # Centered sampling
-    selected: List[ConfigTuple] = []
+    selected: list[ConfigTuple] = []
     selected_indices: set[int] = set()
     for _ in range(max_scenarios):
-        idx = int(round(idx_pos))
+        idx = round(idx_pos)
         if idx < 0:
             idx = 0
         elif idx >= len(full):
@@ -177,7 +178,7 @@ def run_scenario(
     skip_partial_dependence: bool = False,
     unrealized_pnl: bool = False,
     full_logs: bool = False,
-    params: Optional[List[str]] = None,
+    params: list[str] | None = None,
     tail_chars: int = 5000,
 ) -> ScenarioResult:
     (
@@ -223,7 +224,7 @@ def run_scenario(
     if strict:
         cmd.append("--strict_diagnostics")
     if params:
-        cmd += ["--params"] + list(params)
+        cmd += ["--params", *list(params)]
     start = time.perf_counter()
     try:
         proc = subprocess.run(cmd, capture_output=True, text=True, check=False, timeout=timeout)
@@ -371,8 +372,8 @@ def main():
     scenarios = build_arg_matrix(max_scenarios=args.max_scenarios, shuffle_seed=args.shuffle_seed)
 
     # Validate --params basic KEY=VALUE format
-    valid_params: List[str] = []
-    invalid_params: List[str] = []
+    valid_params: list[str] = []
+    invalid_params: list[str] = []
     for p in args.params:
         if "=" in p:
             valid_params.append(p)
@@ -384,7 +385,7 @@ def main():
     args.params = valid_params
 
     # Prepare list of (conf, strict)
-    scenario_pairs: List[Tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios]
+    scenario_pairs: list[tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios]
     indices = {conf: idx for idx, conf in enumerate(scenarios, start=1)}
     n_duplicated = min(max(0, args.strict_sample), len(scenarios))
     if n_duplicated > 0:
@@ -392,7 +393,7 @@ def main():
     for c in scenarios[:n_duplicated]:
         scenario_pairs.append((c, True))
 
-    results: List[ScenarioResult] = []
+    results: list[ScenarioResult] = []
     total = len(scenario_pairs)
     interrupted = False
     try:
@@ -425,7 +426,7 @@ def main():
 
     successes = [r for r in results if r["status"] == "ok"]
     failures = [r for r in results if r["status"] != "ok"]
-    durations: List[float] = [
+    durations: list[float] = [
         float(r["seconds"]) for r in results if isinstance(r["seconds"], float)
     ]
     if durations:
@@ -436,8 +437,8 @@ def main():
             p95_seconds = _sorted[0]
         else:
             pos = 0.95 * (n - 1)
-            i0 = int(math.floor(pos))
-            i1 = int(math.ceil(pos))
+            i0 = math.floor(pos)
+            i1 = math.ceil(pos)
             if i0 == i1:
                 p95_seconds = _sorted[i0]
             else:
@@ -457,7 +458,7 @@ def main():
         "p95_seconds": p95_seconds,
     }
     # Build warnings breakdown
-    warnings_breakdown: Dict[str, int] = {}
+    warnings_breakdown: dict[str, int] = {}
     for r in results:
         text = (r["stderr"] + "\n" + r["stdout"]).splitlines()
         for line in text:
@@ -466,7 +467,7 @@ def main():
                 warnings_breakdown[fp] = warnings_breakdown.get(fp, 0) + 1
 
     # Collect reproducibility metadata
-    def _git_hash() -> Optional[str]:
+    def _git_hash() -> str | None:
         try:
             proc = subprocess.run(
                 ["git", "rev-parse", "--short", "HEAD"],
@@ -504,10 +505,11 @@ def main():
         summary["interrupted"] = True
     # Atomic write to avoid corrupt partial files
     tmp_fd, tmp_path = tempfile.mkstemp(prefix="_tmp_summary_", dir=str(out_dir))
+    tmp_path_obj = Path(tmp_path)
     try:
         with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh:
             json.dump(summary, fh, indent=2)
-        os.replace(tmp_path, out_dir / SUMMARY_FILENAME)
+        tmp_path_obj.replace(out_dir / SUMMARY_FILENAME)
     except Exception:
         # Best effort fallback
         try:
@@ -515,18 +517,14 @@ def main():
                 json.dumps(summary, indent=2), encoding="utf-8"
             )
         finally:
-            if os.path.exists(tmp_path):
-                try:
-                    os.remove(tmp_path)
-                except OSError:
-                    pass
+            if tmp_path_obj.exists():
+                with contextlib.suppress(OSError):
+                    tmp_path_obj.unlink()
     else:
         # Defensive cleanup: remove temp file if atomic replace did not clean up
-        if os.path.exists(tmp_path):
-            try:
-                os.remove(tmp_path)
-            except OSError:
-                pass
+        if tmp_path_obj.exists():
+            with contextlib.suppress(OSError):
+                tmp_path_obj.unlink()
     print(f"Summary saved to: {out_dir / SUMMARY_FILENAME}")
     if not interrupted and summary["failures"]:
         print("Failures detected:")
index d07eaacefffd29f66bd3b8494ebcd50302b98435..b794f95c8054d65db1b5e6c2e8e21f62fb90a378 100644 (file)
@@ -235,7 +235,9 @@ class TestAPIAndHelpers(RewardSpaceTestBase):
         self.assertTrue(math.isnan(_get_float_param({"k": float("-inf")}, "k", 0.0)))
         self.assertTrue(math.isnan(_get_float_param({"k": np.nan}, "k", 0.0)))
         self.assertTrue(
-            math.isnan(_get_float_param(cast(RewardParams, {"k": cast(Any, [1, 2, 3])}), "k", 0.0))
+            math.isnan(
+                _get_float_param(cast("RewardParams", {"k": cast("Any", [1, 2, 3])}), "k", 0.0)
+            )
         )
 
     def test_get_str_param(self):
@@ -284,7 +286,9 @@ class TestAPIAndHelpers(RewardSpaceTestBase):
         self.assertEqual(_get_int_param({"k": ""}, "k", 5), 5)
         self.assertEqual(_get_int_param({"k": "abc"}, "k", 5), 5)
         self.assertEqual(_get_int_param({"k": "NaN"}, "k", 5), 5)
-        self.assertEqual(_get_int_param(cast(RewardParams, {"k": cast(Any, [1, 2, 3])}), "k", 3), 3)
+        self.assertEqual(
+            _get_int_param(cast("RewardParams", {"k": cast("Any", [1, 2, 3])}), "k", 3), 3
+        )
         self.assertEqual(_get_int_param({}, "missing", "zzz"), 0)
 
     def test_argument_parser_construction(self):
index 33b03dc3e7df43167aa5d07142ee400e79a6013b..6dd93e71e3e0ad2d37f1d286b3e550091726792b 100644 (file)
@@ -62,11 +62,11 @@ class TestCsvEncoding(RewardSpaceTestBase):
         self.assertIn("action", df.columns)
         values = df["action"].tolist()
         self.assertTrue(
-            all((float(v).is_integer() for v in values)),
+            all(float(v).is_integer() for v in values),
             "Non-integer values detected in 'action' column",
         )
         allowed = {int(action.value) for action in Actions}
-        self.assertTrue(set((int(v) for v in values)).issubset(allowed))
+        self.assertTrue({int(v) for v in values}.issubset(allowed))
 
 
 class TestParamsPropagation(RewardSpaceTestBase):
@@ -181,7 +181,7 @@ class TestParamsPropagation(RewardSpaceTestBase):
         _assert_cli_success(self, result)
         manifest_path = out_dir / "manifest.json"
         self.assertTrue(manifest_path.exists(), "Missing manifest.json")
-        with open(manifest_path, "r") as f:
+        with manifest_path.open() as f:
             manifest = json.load(f)
         self.assertIn("reward_params", manifest)
         self.assertIn("simulation_params", manifest)
@@ -208,7 +208,7 @@ class TestParamsPropagation(RewardSpaceTestBase):
         _assert_cli_success(self, result)
         manifest_path = out_dir / "manifest.json"
         self.assertTrue(manifest_path.exists(), "Missing manifest.json")
-        with open(manifest_path, "r") as f:
+        with manifest_path.open() as f:
             manifest = json.load(f)
         self.assertIn("reward_params", manifest)
         self.assertIn("simulation_params", manifest)
index caca85b76e221e4a6e4454d8f9c04d91fee1504c..496fdfa49836741747d610725da0df87fcbf7558 100644 (file)
@@ -194,7 +194,7 @@ class TestRewardComponents(RewardSpaceTestBase):
 
         **Setup:**
         - PnL: 0.0 (breakeven)
-        - pnl_target: profit_aim × risk_reward_ratio
+        - pnl_target: profit_aim * risk_reward_ratio
         - Parameters: default base_params
 
         **Assertions:**
@@ -219,7 +219,7 @@ class TestRewardComponents(RewardSpaceTestBase):
 
         **Setup:**
         - PnL: 150% of pnl_target (exceeds target by 50%)
-        - pnl_target: 0.045 (profit_aim=0.03 × risk_reward_ratio=1.5)
+        - pnl_target: 0.045 (profit_aim=0.03 * risk_reward_ratio=1.5)
         - Parameters: win_reward_factor=2.0, pnl_factor_beta=0.5
 
         **Assertions:**
@@ -250,7 +250,7 @@ class TestRewardComponents(RewardSpaceTestBase):
 
         **Setup:**
         - PnL: -0.06 (exceeds pnl_target magnitude)
-        - pnl_target: 0.045 (profit_aim=0.03 × risk_reward_ratio=1.5)
+        - pnl_target: 0.045 (profit_aim=0.03 * risk_reward_ratio=1.5)
         - Penalty threshold: pnl < -pnl_target = -0.045
         - Parameters: win_reward_factor=2.0, pnl_factor_beta=0.5
 
@@ -381,7 +381,7 @@ class TestRewardComponents(RewardSpaceTestBase):
         **Setup:**
         - PnL: -0.005 (very close to min_unrealized_profit=-0.006)
         - Efficiency ratio: (-0.005 - (-0.006)) / (0.0 - (-0.006)) ≈ 0.167 (low)
-        - For losses: coefficient = 1 + weight × (center - ratio) → rewards low ratio
+        - For losses: coefficient = 1 + weight * (center - ratio) → rewards low ratio
         - efficiency_weight: 1.0, efficiency_center: 0.5
         - Trade context: Long position cutting losses quickly
 
@@ -620,7 +620,7 @@ class TestRewardComponents(RewardSpaceTestBase):
             pnl_ratio = pnl / pnl_target
             expected = 1.0 + win_reward_factor * math.tanh(beta * (pnl_ratio - 1.0))
             expected_ratios.append(expected)
-        for obs, exp in zip(ratios_observed, expected_ratios):
+        for obs, exp in zip(ratios_observed, expected_ratios, strict=False):
             self.assertFinite(obs, name="observed_ratio")
             self.assertFinite(exp, name="expected_ratio")
             self.assertLess(
@@ -634,7 +634,7 @@ class TestRewardComponents(RewardSpaceTestBase):
 
         Verifies:
         - max_idle_duration = None → use max_trade_duration as fallback
-        - penalty(duration=40) ≈ 2 × penalty(duration=20)
+        - penalty(duration=40) ≈ 2 * penalty(duration=20)
         - Proportional scaling with idle duration
         """
         base_factor = PARAMS.BASE_FACTOR
index 731f9979f2729faf9e50cc6c3fdac3b816a3a01b..8db775b4a20fa3eb8a871f783aee1ba69659b1d9 100644 (file)
@@ -408,22 +408,22 @@ STAT_TOL: Final[StatisticalTolerances] = StatisticalTolerances()
 
 
 __all__ = [
-    "ToleranceConfig",
+    "CONTINUITY",
+    "EXIT_FACTOR",
+    "PARAMS",
+    "PBRS",
+    "SCENARIOS",
+    "SEEDS",
+    "STATISTICAL",
+    "STAT_TOL",
+    "TOLERANCE",
     "ContinuityConfig",
     "ExitFactorConfig",
     "PBRSConfig",
     "StatisticalConfig",
-    "TestSeeds",
+    "StatisticalTolerances",
     "TestParameters",
     "TestScenarios",
-    "StatisticalTolerances",
-    "TOLERANCE",
-    "CONTINUITY",
-    "EXIT_FACTOR",
-    "PBRS",
-    "STATISTICAL",
-    "SEEDS",
-    "PARAMS",
-    "SCENARIOS",
-    "STAT_TOL",
+    "TestSeeds",
+    "ToleranceConfig",
 ]
index 7160a87332742d3ff7ce4da4989d5eff2ce6ea1b..e90a6e7291e47a75e37fdec9b6048399bfec956b 100644 (file)
@@ -60,52 +60,52 @@ from .warnings import (
 )
 
 __all__ = [
-    "assert_monotonic_nonincreasing",
-    "assert_monotonic_nonnegative",
-    "assert_finite",
+    "DEFAULT_REWARD_CONFIG",
+    "DEFAULT_SIMULATION_CONFIG",
+    "ContextFactory",
+    "ExitFactorConfig",
+    "ProgressiveScalingConfig",
+    "RewardScenarioConfig",
+    "SimulationConfig",
+    "StatisticalTestConfig",
+    "ThresholdTestConfig",
+    "ValidationCallback",
+    "ValidationConfig",
+    "WarningCaptureConfig",
+    "assert_adjustment_reason_contains",
     "assert_almost_equal_list",
-    "assert_trend",
     "assert_component_sum_integrity",
-    "assert_progressive_scaling_behavior",
-    "assert_single_active_component",
-    "assert_single_active_component_with_additives",
-    "assert_reward_calculation_scenarios",
-    "assert_parameter_sensitivity_behavior",
-    "make_idle_penalty_test_contexts",
+    "assert_diagnostic_warning",
     "assert_exit_factor_attenuation_modes",
+    "assert_exit_factor_invariant_suite",
+    "assert_exit_factor_kernel_fallback",
     "assert_exit_factor_plateau_behavior",
     "assert_exit_mode_mathematical_validation",
-    "assert_multi_parameter_sensitivity",
+    "assert_finite",
     "assert_hold_penalty_threshold_behavior",
-    "safe_float",
-    "build_validation_case",
-    "execute_validation_batch",
-    "assert_adjustment_reason_contains",
-    "run_strict_validation_failure_cases",
-    "run_relaxed_validation_adjustment_cases",
-    "assert_exit_factor_invariant_suite",
-    "assert_exit_factor_kernel_fallback",
-    "assert_relaxed_multi_reason_aggregation",
-    "assert_pbrs_invariance_report_classification",
-    "assert_pbrs_canonical_sum_within_tolerance",
+    "assert_monotonic_nonincreasing",
+    "assert_monotonic_nonnegative",
+    "assert_multi_parameter_sensitivity",
+    "assert_no_warnings",
     "assert_non_canonical_shaping_exceeds",
+    "assert_parameter_sensitivity_behavior",
+    "assert_pbrs_canonical_sum_within_tolerance",
+    "assert_pbrs_invariance_report_classification",
+    "assert_progressive_scaling_behavior",
+    "assert_relaxed_multi_reason_aggregation",
+    "assert_reward_calculation_scenarios",
+    "assert_single_active_component",
+    "assert_single_active_component_with_additives",
+    "assert_trend",
+    "build_validation_case",
     "calculate_reward_with_defaults",
+    "capture_warnings",
+    "execute_validation_batch",
     "get_exit_factor_with_defaults",
+    "make_idle_penalty_test_contexts",
+    "run_relaxed_validation_adjustment_cases",
+    "run_strict_validation_failure_cases",
+    "safe_float",
     "simulate_samples_with_defaults",
-    "RewardScenarioConfig",
-    "ValidationConfig",
-    "ThresholdTestConfig",
-    "ProgressiveScalingConfig",
-    "ExitFactorConfig",
-    "StatisticalTestConfig",
-    "SimulationConfig",
-    "WarningCaptureConfig",
-    "ValidationCallback",
-    "ContextFactory",
-    "DEFAULT_REWARD_CONFIG",
-    "DEFAULT_SIMULATION_CONFIG",
-    "capture_warnings",
-    "assert_diagnostic_warning",
-    "assert_no_warnings",
     "validate_warning_content",
 ]
index 530af449895722d47d1822470e408741b0c6e436..76b6cc191e90e1f861ff65933d9840d50ee168ee 100644 (file)
@@ -4,7 +4,9 @@ These functions centralize common numeric and behavioral checks to enforce
 single invariant ownership and reduce duplication across taxonomy modules.
 """
 
-from typing import Any, Dict, List, Sequence, Tuple
+import itertools
+from collections.abc import Sequence
+from typing import Any
 
 import numpy as np
 
@@ -358,7 +360,7 @@ def assert_single_active_component_with_additives(
 
 def assert_reward_calculation_scenarios(
     test_case,
-    scenarios: List[Tuple[Any, Dict[str, Any], str]],
+    scenarios: list[tuple[Any, dict[str, Any], str]],
     config: RewardScenarioConfig,
     validation_fn,
 ):
@@ -405,9 +407,9 @@ def assert_reward_calculation_scenarios(
 
 def assert_parameter_sensitivity_behavior(
     test_case,
-    parameter_variations: List[Dict[str, Any]],
+    parameter_variations: list[dict[str, Any]],
     base_context,
-    base_params: Dict[str, Any],
+    base_params: dict[str, Any],
     component_name: str,
     expected_trend: str,
     config: RewardScenarioConfig,
@@ -486,7 +488,7 @@ def assert_parameter_sensitivity_behavior(
 def make_idle_penalty_test_contexts(
     context_factory_fn,
     idle_duration_scenarios: Sequence[int],
-    base_context_kwargs: Dict[str, Any] | None = None,
+    base_context_kwargs: dict[str, Any] | None = None,
 ):
     """Generate contexts for idle penalty testing with varying durations.
 
@@ -541,7 +543,7 @@ def assert_exit_factor_attenuation_modes(
         test_case: Test case instance with assertion methods
         base_factor: Base scaling factor
         pnl: Realized profit/loss
-        pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio)
+        pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio)
         context: RewardContext for efficiency coefficient calculation
         attenuation_modes: List of mode names to test
         base_params_fn: Factory function for creating parameter dicts
@@ -588,12 +590,14 @@ def assert_exit_factor_attenuation_modes(
             if mode == "plateau_linear":
                 grace = float(mode_params["exit_plateau_grace"])
                 filtered = [
-                    (r, v) for r, v in zip(ratios, values) if r >= grace - tolerance_relaxed
+                    (r, v)
+                    for r, v in zip(ratios, values, strict=False)
+                    if r >= grace - tolerance_relaxed
                 ]
                 values_to_check = [v for _, v in filtered]
             else:
                 values_to_check = values
-            for earlier, later in zip(values_to_check, values_to_check[1:]):
+            for earlier, later in itertools.pairwise(values_to_check):
                 test_case.assertLessEqual(
                     later, earlier + tolerance_relaxed, f"Non-monotonic attenuation in mode={mode}"
                 )
@@ -602,7 +606,7 @@ def assert_exit_factor_attenuation_modes(
 def assert_exit_mode_mathematical_validation(
     test_case,
     context,
-    params: Dict[str, Any],
+    params: dict[str, Any],
     base_factor: float,
     profit_aim: float,
     risk_reward_ratio: float,
@@ -704,16 +708,16 @@ def assert_exit_mode_mathematical_validation(
         reward_half_life.exit_component,
         reward_linear.exit_component,
     ]
-    test_case.assertTrue(all((r > 0 for r in rewards)))
-    unique_rewards = set((f"{r:.6f}" for r in rewards))
+    test_case.assertTrue(all(r > 0 for r in rewards))
+    unique_rewards = {f"{r:.6f}" for r in rewards}
     test_case.assertGreater(len(unique_rewards), 1)
 
 
 def assert_multi_parameter_sensitivity(
     test_case,
-    parameter_test_cases: List[Tuple[float, float, str]],
+    parameter_test_cases: list[tuple[float, float, str]],
     context_factory_fn,
-    base_params: Dict[str, Any],
+    base_params: dict[str, Any],
     config: RewardScenarioConfig,
 ):
     """Validate reward behavior across multiple parameter combinations.
@@ -781,7 +785,7 @@ def assert_multi_parameter_sensitivity(
 def assert_hold_penalty_threshold_behavior(
     test_case,
     context_factory_fn,
-    params: Dict[str, Any],
+    params: dict[str, Any],
     base_factor: float,
     profit_aim: float,
     risk_reward_ratio: float,
@@ -842,11 +846,11 @@ def assert_hold_penalty_threshold_behavior(
 
 
 def build_validation_case(
-    param_updates: Dict[str, Any],
+    param_updates: dict[str, Any],
     strict: bool,
     expect_error: bool = False,
     expected_reason_substrings: Sequence[str] | None = None,
-) -> Dict[str, Any]:
+) -> dict[str, Any]:
     """Build a structured validation test case descriptor.
 
     Creates a standardized test case dictionary for parameter validation testing,
@@ -876,7 +880,7 @@ def build_validation_case(
     }
 
 
-def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validate_fn):
+def execute_validation_batch(test_case, cases: Sequence[dict[str, Any]], validate_fn):
     """Execute a batch of parameter validation test cases.
 
     Runs multiple validation scenarios in batch, handling both strict (error-raising)
@@ -903,7 +907,7 @@ def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validat
             params = case["params"].copy()
             strict_flag = case["strict"]
             if strict_flag and case["expect_error"]:
-                test_case.assertRaises(Exception, validate_fn, params, True)
+                test_case.assertRaises(ValueError, validate_fn, params, True)
                 continue
             result = validate_fn(params, strict=strict_flag)
             if isinstance(result, tuple) and len(result) == 2 and isinstance(result[0], dict):
@@ -922,7 +926,7 @@ def execute_validation_batch(test_case, cases: Sequence[Dict[str, Any]], validat
 
 
 def assert_adjustment_reason_contains(
-    test_case, adjustments: Dict[str, Dict[str, Any]], key: str, expected_substrings: Sequence[str]
+    test_case, adjustments: dict[str, dict[str, Any]], key: str, expected_substrings: Sequence[str]
 ):
     """Assert adjustment reason contains all expected substrings.
 
@@ -953,7 +957,7 @@ def assert_adjustment_reason_contains(
 
 
 def run_strict_validation_failure_cases(
-    test_case, failure_params_list: Sequence[Dict[str, Any]], validate_fn
+    test_case, failure_params_list: Sequence[dict[str, Any]], validate_fn
 ):
     """Batch test strict validation failures.
 
@@ -983,7 +987,7 @@ def run_strict_validation_failure_cases(
 
 def run_relaxed_validation_adjustment_cases(
     test_case,
-    relaxed_cases: Sequence[Tuple[Dict[str, Any], Sequence[str]]],
+    relaxed_cases: Sequence[tuple[dict[str, Any], Sequence[str]]],
     validate_fn,
 ):
     """Batch test relaxed validation adjustments.
@@ -1020,7 +1024,7 @@ def run_relaxed_validation_adjustment_cases(
 
 
 def assert_exit_factor_invariant_suite(
-    test_case, suite_cases: Sequence[Dict[str, Any]], exit_factor_fn
+    test_case, suite_cases: Sequence[dict[str, Any]], exit_factor_fn
 ):
     """Validate exit factor invariants across multiple scenarios.
 
@@ -1033,7 +1037,7 @@ def assert_exit_factor_invariant_suite(
         suite_cases: List of scenario dicts with keys:
             - base_factor: Base scaling factor
             - pnl: Realized profit/loss
-            - pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) for coefficient calculation
+            - pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) for coefficient calculation
             - context: RewardContext for efficiency coefficient
             - duration_ratio: Duration ratio (0-2)
             - params: Parameter dictionary
@@ -1088,8 +1092,8 @@ def assert_exit_factor_kernel_fallback(
     pnl_target: float,
     duration_ratio: float,
     context,
-    bad_params: Dict[str, Any],
-    reference_params: Dict[str, Any],
+    bad_params: dict[str, Any],
+    reference_params: dict[str, Any],
     risk_reward_ratio: float,
 ):
     """Validate exit factor fallback behavior on kernel failure.
@@ -1141,8 +1145,8 @@ def assert_exit_factor_kernel_fallback(
 def assert_relaxed_multi_reason_aggregation(
     test_case,
     validate_fn,
-    params: Dict[str, Any],
-    key_expectations: Dict[str, Sequence[str]],
+    params: dict[str, Any],
+    key_expectations: dict[str, Sequence[str]],
 ):
     """Validate relaxed validation produces expected adjustment reasons.
 
@@ -1268,7 +1272,7 @@ def assert_exit_factor_plateau_behavior(
         exit_factor_fn: Exit factor calculation function (_get_exit_factor)
         base_factor: Base factor for exit calculation
         pnl: PnL value
-        pnl_target: Target profit threshold (pnl_target = profit_aim × risk_reward_ratio) for coefficient calculation
+        pnl_target: Target profit threshold (pnl_target = profit_aim * risk_reward_ratio) for coefficient calculation
         context: RewardContext for efficiency coefficient
         plateau_params: Parameters dict with plateau configuration
         grace: Grace period threshold (exit_plateau_grace value)
@@ -1314,7 +1318,7 @@ def assert_exit_factor_plateau_behavior(
 
 def calculate_reward_with_defaults(
     context,
-    params: Dict[str, Any],
+    params: dict[str, Any],
     config: RewardScenarioConfig | None = None,
     **overrides,
 ):
@@ -1376,7 +1380,7 @@ def get_exit_factor_with_defaults(
     pnl: float,
     duration_ratio: float,
     context,
-    params: Dict[str, Any],
+    params: dict[str, Any],
     base_factor: float | None = None,
     pnl_target: float | None = None,
     risk_reward_ratio: float | None = None,
@@ -1427,7 +1431,7 @@ def get_exit_factor_with_defaults(
 
 
 def simulate_samples_with_defaults(
-    params: Dict[str, Any],
+    params: dict[str, Any],
     config: SimulationConfig | None = None,
     base_factor: float | None = None,
     profit_aim: float | None = None,
index 12742dd162f0f186681c3875b125cb7e69a554a1..d3ad700dcba2d782939cea5d1fe534e2ab86504a 100644 (file)
@@ -21,8 +21,8 @@ Usage:
     ... )
 """
 
+from collections.abc import Callable
 from dataclasses import dataclass
-from typing import Callable, Optional
 
 from ..constants import PARAMS, SEEDS, STATISTICAL, TOLERANCE
 
@@ -67,7 +67,7 @@ class ValidationConfig:
 
     tolerance_strict: float = TOLERANCE.IDENTITY_STRICT
     tolerance_relaxed: float = TOLERANCE.IDENTITY_RELAXED
-    exclude_components: Optional[list[str]] = None
+    exclude_components: list[str] | None = None
     component_description: str = "reward components"
 
 
@@ -117,7 +117,7 @@ class ExitFactorConfig:
     decomposition, attenuation mode and plateau behavior.
 
     The exit factor is computed as:
-        exit_factor = base_factor × time_attenuation × pnl_target × efficiency
+        exit_factor = base_factor * time_attenuation * pnl_target * efficiency
 
     Attributes:
         base_factor: Base scaling factor
@@ -160,7 +160,7 @@ class StatisticalTestConfig:
     n_bootstrap: int = STATISTICAL.BOOTSTRAP_DEFAULT_ITERATIONS
     confidence_level: float = 0.95
     seed: int = SEEDS.BASE
-    adjust_method: Optional[str] = None
+    adjust_method: str | None = None
     alpha: float = 0.05
 
 
@@ -236,16 +236,16 @@ DEFAULT_SIMULATION_CONFIG: SimulationConfig = SimulationConfig(
 
 
 __all__ = [
-    "RewardScenarioConfig",
-    "ValidationConfig",
-    "ThresholdTestConfig",
-    "ProgressiveScalingConfig",
+    "DEFAULT_REWARD_CONFIG",
+    "DEFAULT_SIMULATION_CONFIG",
+    "ContextFactory",
     "ExitFactorConfig",
-    "StatisticalTestConfig",
+    "ProgressiveScalingConfig",
+    "RewardScenarioConfig",
     "SimulationConfig",
-    "WarningCaptureConfig",
+    "StatisticalTestConfig",
+    "ThresholdTestConfig",
     "ValidationCallback",
-    "ContextFactory",
-    "DEFAULT_REWARD_CONFIG",
-    "DEFAULT_SIMULATION_CONFIG",
+    "ValidationConfig",
+    "WarningCaptureConfig",
 ]
index 9de13195f5b708cc041884c064cf14651bf2f08c..fb12852899a0d932838b893403d7b10f0e08f8be 100644 (file)
@@ -16,7 +16,7 @@ Usage:
 
 import warnings
 from contextlib import contextmanager
-from typing import Any, Optional
+from typing import Any
 
 import reward_space_analysis
 
@@ -55,7 +55,7 @@ def capture_warnings(warning_category: type[Warning] = Warning, always_capture:
 @contextmanager
 def assert_diagnostic_warning(
     expected_substrings: list[str],
-    warning_category: Optional[type[Warning]] = None,
+    warning_category: type[Warning] | None = None,
     strict_mode: bool = True,
 ):
     """Context manager that captures warnings and asserts their presence.
@@ -192,8 +192,8 @@ def validate_warning_content(
 
 
 __all__ = [
-    "capture_warnings",
     "assert_diagnostic_warning",
     "assert_no_warnings",
+    "capture_warnings",
     "validate_warning_content",
 ]
index e1eadef1331935541ffaf29ae3ae18cdc97bc7b3..e48c17a333ace24b96a9070c8cf63f65468c7f03 100644 (file)
@@ -90,7 +90,7 @@ class TestIntegration(RewardSpaceTestBase):
         _assert_cli_success(self, result2)
 
         for run_dir in ["run1", "run2"]:
-            with open(self.output_path / run_dir / "manifest.json", "r") as f:
+            with (self.output_path / run_dir / "manifest.json").open() as f:
                 manifest = json.load(f)
             required_keys = {
                 "generated_at",
@@ -112,9 +112,9 @@ class TestIntegration(RewardSpaceTestBase):
             self.assertEqual(manifest["num_samples"], SCENARIOS.SAMPLE_SIZE_SMALL)
             self.assertEqual(manifest["seed"], SEEDS.BASE)
 
-        with open(self.output_path / "run1" / "manifest.json", "r") as f:
+        with (self.output_path / "run1" / "manifest.json").open() as f:
             manifest1 = json.load(f)
-        with open(self.output_path / "run2" / "manifest.json", "r") as f:
+        with (self.output_path / "run2" / "manifest.json").open() as f:
             manifest2 = json.load(f)
         self.assertEqual(
             manifest1["params_hash"],
index aaac0d37738a4f68add9bd00eae0e37a5312e6ed..85dc8e2f4e3fbdcb1403d4142e4eaadf69e3e1fa 100644 (file)
@@ -492,8 +492,8 @@ class TestPBRS(RewardSpaceTestBase):
         terminal_next_potentials, shaping_values = self._canonical_sweep(params)
         self.assertEqual(params, params_before)
         if terminal_next_potentials:
-            self.assertTrue(all((abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials)))
-        max_abs = max((abs(v) for v in shaping_values)) if shaping_values else 0.0
+            self.assertTrue(all(abs(p) < PBRS.TERMINAL_TOL for p in terminal_next_potentials))
+        max_abs = max(abs(v) for v in shaping_values) if shaping_values else 0.0
         self.assertLessEqual(max_abs, PBRS.MAX_ABS_SHAPING)
 
     def test_progressive_release_negative_decay_clamped(self):
@@ -528,7 +528,7 @@ class TestPBRS(RewardSpaceTestBase):
             gamma = float(gamma_fallback)
         except Exception:
             gamma = 0.95
-        # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0.
+        # PBRS shaping Δ = γ·Φ(next) − Φ(prev). Here Φ(next)=Φ(prev) since decay clamps to 0.  # noqa: RUF003
         self.assertLessEqual(
             abs(shaping - ((gamma - 1.0) * prev_potential)),
             TOLERANCE.GENERIC_EQ,
@@ -788,7 +788,7 @@ class TestPBRS(RewardSpaceTestBase):
         )
         execute_validation_batch(
             self,
-            [success_case] + strict_failures + [relaxed_case],
+            [success_case, *strict_failures, relaxed_case],
             validate_reward_parameters,
         )
         params_relaxed = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
@@ -815,13 +815,13 @@ class TestPBRS(RewardSpaceTestBase):
     def test_compute_exit_potential_mode_differences(self):
         """Exit potential modes: canonical vs spike_cancel shaping magnitude differences."""
         gamma = 0.93
-        base_common = dict(
-            hold_potential_enabled=True,
-            potential_gamma=gamma,
-            entry_additive_enabled=False,
-            exit_additive_enabled=False,
-            hold_potential_ratio=1.0,
-        )
+        base_common = {
+            "hold_potential_enabled": True,
+            "potential_gamma": gamma,
+            "entry_additive_enabled": False,
+            "exit_additive_enabled": False,
+            "hold_potential_ratio": 1.0,
+        }
         ctx_pnl = 0.012
         ctx_dur_ratio = 0.3
         params_can = self.base_params(exit_potential_mode="canonical", **base_common)
@@ -1113,7 +1113,7 @@ class TestPBRS(RewardSpaceTestBase):
             self.assertLessEqual(abs(shap), PBRS.MAX_ABS_SHAPING)
 
             # With bounded transforms and hold_potential_ratio=1:
-            # |Φ(s)| <= base_factor and |Δ| <= (1+γ)*base_factor
+            # |Φ(s)| <= base_factor and |Δ| <= (1+γ)*base_factor  # noqa: RUF003
             self.assertLessEqual(abs(float(shap)), (1.0 + gamma) * PARAMS.BASE_FACTOR)
 
     def test_report_cumulative_invariance_aggregation(self):
@@ -1159,10 +1159,7 @@ class TestPBRS(RewardSpaceTestBase):
             if abs(inc) > max_abs_step:
                 max_abs_step = abs(inc)
             steps += 1
-            if is_exit:
-                prev_potential = 0.0
-            else:
-                prev_potential = next_potential
+            prev_potential = 0.0 if is_exit else next_potential
         mean_drift = telescoping_sum / max(1, steps)
         self.assertLess(
             abs(mean_drift),
index 6962da7d05cd68ce9ab28608a4332a67b5cd1273..490cec6429f9691ef30748533de71d301d31c93e 100644 (file)
@@ -44,8 +44,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
     def test_decomposition_integrity(self):
         """reward must equal the single active core component under mutually exclusive scenarios (idle/hold/exit/invalid)."""
         scenarios = [
-            dict(
-                ctx=self.make_ctx(
+            {
+                "ctx": self.make_ctx(
                     pnl=0.0,
                     trade_duration=0,
                     idle_duration=25,
@@ -54,10 +54,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                     position=Positions.Neutral,
                     action=Actions.Neutral,
                 ),
-                active="idle_penalty",
-            ),
-            dict(
-                ctx=self.make_ctx(
+                "active": "idle_penalty",
+            },
+            {
+                "ctx": self.make_ctx(
                     pnl=0.0,
                     trade_duration=150,
                     idle_duration=0,
@@ -66,10 +66,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                     position=Positions.Long,
                     action=Actions.Neutral,
                 ),
-                active="hold_penalty",
-            ),
-            dict(
-                ctx=self.make_ctx(
+                "active": "hold_penalty",
+            },
+            {
+                "ctx": self.make_ctx(
                     pnl=PARAMS.PROFIT_AIM,
                     trade_duration=60,
                     idle_duration=0,
@@ -78,10 +78,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                     position=Positions.Long,
                     action=Actions.Long_exit,
                 ),
-                active="exit_component",
-            ),
-            dict(
-                ctx=self.make_ctx(
+                "active": "exit_component",
+            },
+            {
+                "ctx": self.make_ctx(
                     pnl=0.01,
                     trade_duration=10,
                     idle_duration=0,
@@ -90,8 +90,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                     position=Positions.Short,
                     action=Actions.Long_exit,
                 ),
-                active="invalid_penalty",
-            ),
+                "active": "invalid_penalty",
+            },
         ]
         for sc in scenarios:
             ctx_obj = sc["ctx"]
@@ -178,7 +178,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         )
 
         # Part 2: Monotonic attenuation validation
-        modes = list(ATTENUATION_MODES) + ["plateau_linear"]
+        modes = [*list(ATTENUATION_MODES), "plateau_linear"]
         test_pnl = 0.05
         test_context = self.make_ctx(
             pnl=test_pnl,
@@ -232,12 +232,9 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         self.assertTrue(runtime_warnings)
         self.assertTrue(
             any(
-                (
-                    ">" in str(w.message)
-                    and "threshold" in str(w.message)
-                    or "|exit_factor|=" in str(w.message)
-                    for w in runtime_warnings
-                )
+                (">" in str(w.message) and "threshold" in str(w.message))
+                or "|exit_factor|=" in str(w.message)
+                for w in runtime_warnings
             )
         )
 
@@ -298,10 +295,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                 params,
                 PARAMS.RISK_REWARD_RATIO,
             )
-            if 0.0 < tau <= 1.0:
-                alpha = -math.log(tau) / math.log(2.0)
-            else:
-                alpha = 1.0
+            alpha = -math.log(tau) / math.log(2.0) if 0.0 < tau <= 1.0 else 1.0
             expected_ratio = 1.0 / (1.0 + duration_ratio) ** alpha
             observed_ratio = f1 / f0 if f0 != 0 else np.nan
             self.assertFinite(observed_ratio, name="observed_ratio")
@@ -656,7 +650,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
             f"Scaling ratio too large (ratio={ratio:.2f})",
         )
 
-    # === Robustness invariants 102–105 ===
+    # === Robustness invariants 102–105 ===  # noqa: RUF003
     # Owns invariant: robustness-exit-mode-fallback-102
     def test_robustness_102_unknown_exit_mode_fallback_linear(self):
         """Invariant 102: Unknown exit_attenuation_mode gracefully warns and falls back to linear kernel."""
index 2147c239f97d9bb9049f9dbf7fecf727808a9a02..5434718a96676a25d91c90fd1f84db4fd2f2c9e4 100644 (file)
@@ -80,7 +80,7 @@ def test_feature_analysis_empty_frame():
     - model is None
     """
     df = _minimal_df(0)  # empty
-    importance_df, stats, partial_deps, model = _perform_feature_analysis(
+    importance_df, stats, _partial_deps, model = _perform_feature_analysis(
         df, seed=SEEDS.FEATURE_EMPTY, skip_partial_dependence=True
     )
     assert importance_df.empty
@@ -102,7 +102,7 @@ def test_feature_analysis_single_feature_path():
     """
     rng = np.random.default_rng(SEEDS.FEATURE_PRIME_11)
     df = pd.DataFrame({"pnl": rng.normal(0, 1, 25), "reward": rng.normal(0, 1, 25)})
-    importance_df, stats, partial_deps, model = _perform_feature_analysis(
+    importance_df, stats, _partial_deps, model = _perform_feature_analysis(
         df, seed=SEEDS.FEATURE_PRIME_11, skip_partial_dependence=True
     )
     assert stats["n_features"] == 1
@@ -132,7 +132,7 @@ def test_feature_analysis_nans_present_path():
             "reward": rng.normal(0, 1, 40),
         }
     )
-    importance_df, stats, partial_deps, model = _perform_feature_analysis(
+    importance_df, stats, _partial_deps, model = _perform_feature_analysis(
         df, seed=SEEDS.FEATURE_PRIME_13, skip_partial_dependence=True
     )
     # Should hit NaN stub path (model_fitted False)
@@ -161,12 +161,12 @@ def test_feature_analysis_model_fitting_failure(monkeypatch):
     if RandomForestRegressor is None:  # type: ignore[comparison-overlap]
         pytest.skip("sklearn components unavailable; skipping model fitting failure test")
 
-    def boom(self, *a, **kw):  # noqa: D401
+    def boom(self, *a, **kw):
         raise RuntimeError("forced fit failure")
 
     monkeypatch.setattr(RandomForestRegressor, "fit", boom)
     df = _minimal_df(50)
-    importance_df, stats, partial_deps, model = _perform_feature_analysis(
+    importance_df, stats, _partial_deps, model = _perform_feature_analysis(
         df, seed=SEEDS.FEATURE_PRIME_21, skip_partial_dependence=True
     )
     assert stats["model_fitted"] is False
@@ -194,7 +194,7 @@ def test_feature_analysis_permutation_failure_partial_dependence(monkeypatch):
     """
 
     # Monkeypatch permutation_importance to raise while allowing partial dependence
-    def perm_boom(*a, **kw):  # noqa: D401
+    def perm_boom(*a, **kw):
         raise RuntimeError("forced permutation failure")
 
     monkeypatch.setattr("reward_space_analysis.permutation_importance", perm_boom)
@@ -249,7 +249,7 @@ def test_feature_analysis_import_fallback(monkeypatch):
 
 
 def test_module_level_sklearn_import_failure_reload():
-    """Force module-level sklearn import failure to execute fallback block (lines 3242).
+    """Force module-level sklearn import failure to execute fallback block (lines 32-42).
 
     Strategy:
     - Temporarily monkeypatch builtins.__import__ to raise on any 'sklearn' import.
@@ -261,7 +261,7 @@ def test_module_level_sklearn_import_failure_reload():
     orig_mod = sys.modules.get("reward_space_analysis")
     orig_import = builtins.__import__
 
-    def fake_import(name, *args, **kwargs):  # noqa: D401
+    def fake_import(name, *args, **kwargs):
         if name.startswith("sklearn"):
             raise RuntimeError("forced sklearn import failure")
         return orig_import(name, *args, **kwargs)
@@ -274,10 +274,10 @@ def test_module_level_sklearn_import_failure_reload():
         reloaded_module = importlib.import_module("reward_space_analysis")
 
         # Fallback assigns sklearn symbols to None
-        assert getattr(reloaded_module, "RandomForestRegressor") is None
-        assert getattr(reloaded_module, "train_test_split") is None
-        assert getattr(reloaded_module, "permutation_importance") is None
-        assert getattr(reloaded_module, "r2_score") is None
+        assert reloaded_module.RandomForestRegressor is None
+        assert reloaded_module.train_test_split is None
+        assert reloaded_module.permutation_importance is None
+        assert reloaded_module.r2_score is None
         # Perform feature analysis should raise ImportError under missing components
         df = _minimal_df(15)
         with pytest.raises(ImportError):
index e5542aa91fbd69ed444085fa6f4c6eef184f76c6..7487bfbe9d80e9a48b123abdfacb99ba1c2b2ba9 100644 (file)
@@ -45,7 +45,7 @@ class TestStatistics(RewardSpaceTestBase):
         # Use existing helper to get synthetic stats df (small for speed)
         df = self.make_stats_df(n=120, seed=SEEDS.BASE, idle_pattern="mixed")
         try:
-            importance_df, analysis_stats, partial_deps, model = _perform_feature_analysis(
+            importance_df, analysis_stats, partial_deps, _model = _perform_feature_analysis(
                 df, seed=SEEDS.BASE, skip_partial_dependence=True, rf_n_jobs=1, perm_n_jobs=1
             )
         except ImportError:
@@ -135,15 +135,13 @@ class TestStatistics(RewardSpaceTestBase):
         for metric_name, value in metrics.items():
             if "pnl" in metric_name:
                 if any(
-                    (
-                        suffix in metric_name
-                        for suffix in [
-                            "js_distance",
-                            "ks_statistic",
-                            "wasserstein",
-                            "kl_divergence",
-                        ]
-                    )
+                    suffix in metric_name
+                    for suffix in [
+                        "js_distance",
+                        "ks_statistic",
+                        "wasserstein",
+                        "kl_divergence",
+                    ]
                 ):
                     self.assertDistanceMetric(value, name=metric_name)
                 else:
@@ -180,7 +178,7 @@ class TestStatistics(RewardSpaceTestBase):
                     "Idle duration and reward arrays should have same length",
                 )
                 self.assertTrue(
-                    all((d >= 0 for d in idle_dur)), "Idle durations should be non-negative"
+                    all(d >= 0 for d in idle_dur), "Idle durations should be non-negative"
                 )
                 negative_rewards = (idle_rew < 0).sum()
                 total_rewards = len(idle_rew)
@@ -231,7 +229,7 @@ class TestStatistics(RewardSpaceTestBase):
         diagnostics = distribution_diagnostics(df)
         expected_prefixes = ["reward_", "pnl_"]
         for prefix in expected_prefixes:
-            matching_keys = [key for key in diagnostics.keys() if key.startswith(prefix)]
+            matching_keys = [key for key in diagnostics if key.startswith(prefix)]
             self.assertGreater(len(matching_keys), 0, f"Should have diagnostics for {prefix}")
             expected_suffixes = ["mean", "std", "skewness", "kurtosis"]
             for suffix in expected_suffixes:
@@ -509,7 +507,7 @@ class TestStatistics(RewardSpaceTestBase):
             df, adjust_method="benjamini_hochberg", seed=SEEDS.REPRODUCIBILITY
         )
         self.assertGreater(len(results_adj), 0)
-        for name, res in results_adj.items():
+        for _name, res in results_adj.items():
             self.assertIn("p_value", res)
             self.assertIn("p_value_adj", res)
             self.assertIn("significant_adj", res)
@@ -542,8 +540,8 @@ class TestStatistics(RewardSpaceTestBase):
         large = self._shift_scale_df(SCENARIOS.SAMPLE_SIZE_LARGE)
         res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400)
         res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400)
-        _, lo_s, hi_s = list(res_small.values())[0]
-        _, lo_l, hi_l = list(res_large.values())[0]
+        _, lo_s, hi_s = next(iter(res_small.values()))
+        _, lo_l, hi_l = next(iter(res_large.values()))
         hw_small = (hi_s - lo_s) / 2.0
         hw_large = (hi_l - lo_l) / 2.0
         self.assertFinite(hw_small, name="hw_small")
index 0867b0873fce874cc7c4fdea79d8a0c492f155cb..86f3fd02d4bd2796660ab6245ae85c3629fe72a0 100644 (file)
@@ -1,13 +1,15 @@
 #!/usr/bin/env python3
 """Base class and utilities for reward space analysis tests."""
 
+import itertools
 import math
 import random
 import shutil
 import tempfile
 import unittest
+from collections.abc import Iterable, Sequence
 from pathlib import Path
-from typing import Any, Dict, Iterable, Optional, Sequence, Union
+from typing import Any
 
 import numpy as np
 import pandas as pd
@@ -61,7 +63,7 @@ PBRS_INTEGRATION_PARAMS = [
     "entry_additive_enabled",
     "exit_additive_enabled",
 ]
-PBRS_REQUIRED_PARAMS = PBRS_INTEGRATION_PARAMS + ["exit_potential_mode"]
+PBRS_REQUIRED_PARAMS = [*PBRS_INTEGRATION_PARAMS, "exit_potential_mode"]
 
 
 class RewardSpaceTestBase(unittest.TestCase):
@@ -105,9 +107,9 @@ class RewardSpaceTestBase(unittest.TestCase):
             action=action,
         )
 
-    def base_params(self, **overrides) -> Dict[str, Any]:
+    def base_params(self, **overrides) -> dict[str, Any]:
         """Return fresh copy of default reward params with overrides."""
-        params: Dict[str, Any] = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+        params: dict[str, Any] = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
         params.update(overrides)
         return params
 
@@ -115,8 +117,8 @@ class RewardSpaceTestBase(unittest.TestCase):
         self,
         params: dict,
         *,
-        iterations: Optional[int] = None,
-        terminal_prob: Optional[float] = None,
+        iterations: int | None = None,
+        terminal_prob: float | None = None,
         seed: int = SEEDS.CANONICAL_SWEEP,
     ) -> tuple[list[float], list[float]]:
         """Run a lightweight canonical invariance sweep.
@@ -171,10 +173,10 @@ class RewardSpaceTestBase(unittest.TestCase):
         reward_mean: float = 0.0,
         reward_std: float = 1.0,
         pnl_mean: float = 0.01,
-        pnl_std: Optional[float] = None,
+        pnl_std: float | None = None,
         trade_duration_dist: str = "uniform",
         idle_pattern: str = "mixed",
-        seed: Optional[int] = None,
+        seed: int | None = None,
     ) -> pd.DataFrame:
         """Generate a synthetic statistical DataFrame.
 
@@ -235,11 +237,11 @@ class RewardSpaceTestBase(unittest.TestCase):
 
     def assertAlmostEqualFloat(
         self,
-        first: Union[float, int],
-        second: Union[float, int],
-        tolerance: Optional[float] = None,
-        rtol: Optional[float] = None,
-        msg: Union[str, None] = None,
+        first: float | int,
+        second: float | int,
+        tolerance: float | None = None,
+        rtol: float | None = None,
+        msg: str | None = None,
     ) -> None:
         """Compare floats with absolute and optional relative tolerance.
 
@@ -264,14 +266,14 @@ class RewardSpaceTestBase(unittest.TestCase):
             or f"Difference {diff} exceeds tolerance {tolerance} and relative tolerance {rtol} (a={first}, b={second})"
         )
 
-    def assertPValue(self, value: Union[float, int], msg: str = "") -> None:
+    def assertPValue(self, value: float | int, msg: str = "") -> None:
         """Assert a p-value is finite and within [0,1]."""
         self.assertFinite(value, name="p-value")
         self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}")
         self.assertLessEqual(value, 1.0, msg or f"p-value > 1: {value}")
 
     def assertPlacesEqual(
-        self, a: Union[float, int], b: Union[float, int], places: int, msg: Optional[str] = None
+        self, a: float | int, b: float | int, places: int, msg: str | None = None
     ) -> None:
         """Bridge for legacy places-based approximate equality.
 
@@ -283,10 +285,10 @@ class RewardSpaceTestBase(unittest.TestCase):
 
     def assertDistanceMetric(
         self,
-        value: Union[float, int],
+        value: float | int,
         *,
         non_negative: bool = True,
-        upper: Optional[float] = None,
+        upper: float | None = None,
         name: str = "metric",
     ) -> None:
         """Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound."""
@@ -298,7 +300,7 @@ class RewardSpaceTestBase(unittest.TestCase):
 
     def assertEffectSize(
         self,
-        value: Union[float, int],
+        value: float | int,
         *,
         lower: float = -1.0,
         upper: float = 1.0,
@@ -309,17 +311,17 @@ class RewardSpaceTestBase(unittest.TestCase):
         self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}")
         self.assertLessEqual(value, upper, f"{name} > {upper}: {value}")
 
-    def assertFinite(self, value: Union[float, int], name: str = "value") -> None:
+    def assertFinite(self, value: float | int, name: str = "value") -> None:
         """Assert scalar is finite."""
         if not np.isfinite(value):
             self.fail(f"{name} not finite: {value}")
 
     def assertMonotonic(
         self,
-        seq: Union[Sequence[Union[float, int]], Iterable[Union[float, int]]],
+        seq: Sequence[float | int] | Iterable[float | int],
         *,
-        non_increasing: Optional[bool] = None,
-        non_decreasing: Optional[bool] = None,
+        non_increasing: bool | None = None,
+        non_decreasing: bool | None = None,
         tolerance: float = 0.0,
         name: str = "sequence",
     ) -> None:
@@ -331,21 +333,20 @@ class RewardSpaceTestBase(unittest.TestCase):
         data = list(seq)
         if len(data) < 2:
             return
-        if non_increasing and non_decreasing or (not non_increasing and (not non_decreasing)):
+        if (non_increasing and non_decreasing) or (not non_increasing and (not non_decreasing)):
             self.fail("Specify exactly one monotonic direction")
-        for a, b in zip(data, data[1:]):
+        for a, b in itertools.pairwise(data):
             if non_increasing:
                 if b > a + tolerance:
                     self.fail(f"{name} not non-increasing at pair ({a}, {b})")
-            elif non_decreasing:
-                if b + tolerance < a:
-                    self.fail(f"{name} not non-decreasing at pair ({a}, {b})")
+            elif non_decreasing and b + tolerance < a:
+                self.fail(f"{name} not non-decreasing at pair ({a}, {b})")
 
     def assertWithin(
         self,
-        value: Union[float, int],
-        low: Union[float, int],
-        high: Union[float, int],
+        value: float | int,
+        low: float | int,
+        high: float | int,
         *,
         name: str = "value",
         inclusive: bool = True,
@@ -360,7 +361,7 @@ class RewardSpaceTestBase(unittest.TestCase):
             self.assertLess(value, high, f"{name} >= {high}")
 
     def assertNearZero(
-        self, value: Union[float, int], *, atol: Optional[float] = None, msg: Optional[str] = None
+        self, value: float | int, *, atol: float | None = None, msg: str | None = None
     ) -> None:
         """Assert a scalar is numerically near zero within absolute tolerance.
 
@@ -377,9 +378,9 @@ class RewardSpaceTestBase(unittest.TestCase):
         a,
         b,
         *,
-        atol: Optional[float] = None,
-        rtol: Optional[float] = None,
-        msg: Optional[str] = None,
+        atol: float | None = None,
+        rtol: float | None = None,
+        msg: str | None = None,
     ) -> None:
         """Assert function(func, a, b) == function(func, b, a) within tolerance.
 
index 4f8fc09d2c3425945ae98291e1df722c41270db7..30042411d262a548d8ff8ab258f365f4faf8d90b 100644 (file)
@@ -5,6 +5,7 @@ reducing duplication while maintaining full functional coverage for mathematical
 """
 
 import math
+from typing import ClassVar
 
 import pytest
 
@@ -20,8 +21,8 @@ class TestTransforms(RewardSpaceTestBase):
     """Comprehensive transform function tests with parameterized scenarios."""
 
     # Transform function test data
-    SMOOTH_TRANSFORMS = [t for t in ALLOWED_TRANSFORMS if t != "clip"]
-    ALL_TRANSFORMS = list(ALLOWED_TRANSFORMS)
+    SMOOTH_TRANSFORMS: ClassVar[list[str]] = [t for t in ALLOWED_TRANSFORMS if t != "clip"]
+    ALL_TRANSFORMS: ClassVar[list[str]] = list(ALLOWED_TRANSFORMS)
 
     def test_transform_exact_values(self):
         """Test transform functions produce exact expected values for specific inputs."""
@@ -34,14 +35,14 @@ class TestTransforms(RewardSpaceTestBase):
             ("asinh", [0.0], [0.0]),  # More complex calculations tested separately
             # arctan transform: (2/pi) * arctan(x) in (-1, 1)
             ("arctan", [0.0, 1.0], [0.0, 2.0 / math.pi * math.atan(1.0)]),
-            # sigmoid transform: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1)
+            # sigmoid transform: 2σ(x) - 1, σ(x) = 1/(1 + e^(-x)) in (-1, 1)  # noqa: RUF003
             ("sigmoid", [0.0], [0.0]),  # More complex calculations tested separately
             # clip transform: clip(x, -1, 1) in [-1, 1]
             ("clip", [0.0, 0.5, 2.0, -2.0], [0.0, 0.5, 1.0, -1.0]),
         ]
 
         for transform_name, test_values, expected_values in test_cases:
-            for test_val, expected_value in zip(test_values, expected_values):
+            for test_val, expected_value in zip(test_values, expected_values, strict=False):
                 with self.subTest(
                     transform=transform_name, input=test_val, expected=expected_value
                 ):
index fbd2d3c7d512420d28a7781f3670ebd9c94ee712..505c04fcbf05c6876be9034419c8aaa4dbb0233f 100644 (file)
@@ -343,12 +343,11 @@ source = { editable = "." }
 dependencies = [
     { name = "numpy" },
     { name = "pandas" },
-    { name = "pytest" },
     { name = "scikit-learn" },
     { name = "scipy" },
 ]
 
-[package.dev-dependencies]
+[package.optional-dependencies]
 dev = [
     { name = "pytest" },
     { name = "pytest-cov" },
@@ -359,17 +358,13 @@ dev = [
 requires-dist = [
     { name = "numpy", specifier = ">=1.26" },
     { name = "pandas" },
-    { name = "pytest" },
+    { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" },
+    { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=7.0" },
+    { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8" },
     { name = "scikit-learn" },
     { name = "scipy", specifier = ">=1.11" },
 ]
-
-[package.metadata.requires-dev]
-dev = [
-    { name = "pytest", specifier = ">=6.0" },
-    { name = "pytest-cov", specifier = ">=7.0.0" },
-    { name = "ruff" },
-]
+provides-extras = ["dev"]
 
 [[package]]
 name = "ruff"
index 83c3ea6e05565ac43cb75c17c0528e3b1de52795..897474128e4fdf5828eaa556af6036ea7e1460ae 100644 (file)
@@ -2033,9 +2033,7 @@ class MyRLEnv(Base5ActionRLEnv):
         if require_position and position not in (Positions.Long, Positions.Short):
             return 0.0
 
-        duration_ratio = 0.0 if duration_ratio < 0.0 else duration_ratio
-        if duration_ratio > 1.0:
-            duration_ratio = 1.0
+        duration_ratio = max(0.0, duration_ratio)
 
         try:
             pnl_ratio = pnl / pnl_target
@@ -2295,7 +2293,7 @@ class MyRLEnv(Base5ActionRLEnv):
 
         **State Variables:**
             r_pnl         : pnl / pnl_target (PnL ratio)
-            r_dur         : duration / max_duration (duration ratio, clamp [0,1])
+            r_dur         : duration / max_duration (duration ratio, max 0)
             scale         : scale parameter
             g             : gain parameter
             T_x           : transform function (tanh, softsign, etc.)