From: Jérôme Benoit Date: Tue, 14 Oct 2025 15:30:41 +0000 (+0200) Subject: perf(reforcexy): make exit reward risk/reward ratio aware X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=9e3b87961c38db5f9997c9006881b0473d1092b1;p=freqai-strategies.git perf(reforcexy): make exit reward risk/reward ratio aware Signed-off-by: Jérôme Benoit --- diff --git a/ReforceXY/reward_space_analysis/README.md b/ReforceXY/reward_space_analysis/README.md index 36cd9a9..f75b33d 100644 --- a/ReforceXY/reward_space_analysis/README.md +++ b/ReforceXY/reward_space_analysis/README.md @@ -12,7 +12,7 @@ This tool helps you understand and validate how the ReforceXY reinforcement lear - ✅ Generate thousands of synthetic trading scenarios deterministically - ✅ Analyze reward distribution, feature importance & partial dependence -- ✅ Built‑in invariant & statistical validation layers (fail‑fast) +- ✅ Built-in invariant & statistical validation layers (fail-fast) - ✅ PBRS (Potential-Based Reward Shaping) integration with canonical invariance - ✅ Export reproducible artifacts (parameter hash + execution manifest) - ✅ Compare synthetic vs real trading data (distribution shift metrics) @@ -278,10 +278,10 @@ _Exit attenuation configuration:_ - `exit_attenuation_mode` (default: linear) - Selects attenuation kernel (see table below: legacy|sqrt|linear|power|half_life). Fallback to linear. - `exit_plateau` (default: true) - Enables plateau (no attenuation until `exit_plateau_grace`). -- `exit_plateau_grace` (default: 1.0) - Duration ratio boundary of full‑strength region (may exceed 1.0). +- `exit_plateau_grace` (default: 1.0) - Duration ratio boundary of full-strength region (may exceed 1.0). - `exit_linear_slope` (default: 1.0) - Slope parameter used only when mode = linear. - `exit_power_tau` (default: 0.5) - Tau ∈ (0,1]; internally mapped to alpha (see kernel table). -- `exit_half_life` (default: 0.5) - Half‑life parameter for the half_life kernel. +- `exit_half_life` (default: 0.5) - Half-life parameter for the half_life kernel. - `exit_factor_threshold` (default: 10000.0) - Warning-only soft threshold (emits RuntimeWarning; no capping). Attenuation kernels: @@ -294,8 +294,8 @@ effective_r = r - grace if exit_plateau and r > grace effective_r = r if not exit_plateau ``` -| Mode | Multiplier (applied to base_factor * pnl * pnl_factor * efficiency_factor) | Monotonic ↓ | Notes | -|------|---------------------------------------------------------------------|-------------|-------| +| Mode | Multiplier (applied to base_factor * pnl * pnl_factor * efficiency_factor) | Monotonic decreasing (Yes/No) | Notes | +|------|---------------------------------------------------------------------|-------------------------------|-------| | legacy | step: ×1.5 if r* ≤ 1 else ×0.5 | No | Historical discontinuity retained (not smoothed) | | sqrt | 1 / sqrt(1 + r*) | Yes | Sub-linear decay | | linear | 1 / (1 + slope * r*) | Yes | slope = `exit_linear_slope` (≥0) | @@ -357,12 +357,36 @@ _Invariant / safety controls:_ - Use this if you want to generate multiple independent statistical analyses over the same synthetic dataset without re-simulating samples. - If omitted, falls back to `--seed` for full run determinism. +**`--strict_diagnostics`** (flag, default: disabled) + +Fail-fast switch controlling handling of degenerate statistical situations: + +| Condition | Graceful (default) | Strict (`--strict_diagnostics`) | +|-----------|--------------------|---------------------------------| +| Zero-width bootstrap CI | Widen by epsilon (~1e-9) + warning | Abort (AssertionError) | +| NaN skewness/kurtosis (constant distribution) | Replace with 0.0 + warning | Abort | +| NaN Anderson statistic (constant distribution) | Replace with 0.0 + warning | Abort | +| NaN Q-Q R² (constant distribution) | Replace with 1.0 + warning | Abort | + +Use strict mode in CI or research contexts requiring hard guarantees; keep default for exploratory analysis to avoid aborting entire runs on trivial constants. + +**`--bootstrap_resamples`** (int, default: 10000) + +- Number of bootstrap resamples used for confidence intervals (percentile method). +- Lower values (< 500) yield coarse intervals; a warning (RewardDiagnosticsWarning) is emitted if below internal recommended minimum (currently 200) to help with very fast exploratory runs. +- Increase for more stable interval endpoints (typical: 5000–20000). Runtime scales roughly linearly. + +**`--skip_partial_dependence`** (flag, default: disabled) + +- When set, skips computation and export of partial dependence CSV files, reducing runtime (often 30–60% faster for large sample sizes) at the cost of losing marginal response curve inspection. +- Feature importance (RandomForest Gini importance + permutation importance) is still computed. + ### Reproducibility Model | Component | Controlled By | Notes | |-----------|---------------|-------| | Sample simulation | `--seed` | Drives action sampling, PnL noise generation. | -| Statistical tests / bootstrap | `--stats_seed` (fallback `--seed`) | Local RNG; isolation prevents side‑effects in user code. | +| Statistical tests / bootstrap | `--stats_seed` (fallback `--seed`) | Local RNG; isolation prevents side-effects in user code. | | RandomForest & permutation importance | `--seed` | Ensures identical splits and tree construction. | | Partial dependence grids | Deterministic | Depends only on fitted model & data. | @@ -432,17 +456,19 @@ The analysis generates the following output files: **`statistical_analysis.md`** - Comprehensive statistical analysis containing: -- **Global Statistics** - Reward distributions and component activation rates -- **Sample Representativity** - Coverage of critical market scenarios -- **Component Analysis** - Relationships between rewards and conditions -- **PBRS Analysis** - Potential-based reward shaping component activation rates, statistics, and invariance validation -- **Feature Importance** - Machine learning analysis of key drivers -- **Statistical Validation** - Hypothesis tests, confidence intervals, normality + effect sizes -- **Distribution Shift** - Real vs synthetic divergence (KL, JS, Wasserstein, KS) -- **Diagnostics Validation Summary** - - Pass/fail snapshot of all runtime checks - - Consolidated pass/fail state of every validation layer (invariants, parameter bounds, bootstrap CIs, distribution metrics, diagnostics, hypothesis tests) - - PBRS invariance validation (canonical mode check: ∑shaping_rewards ≈ 0) +1. **Global Statistics** - Reward distribution, per-action stats, component activation & ranges. +2. **Sample Representativity** - Position/action distributions, critical regime coverage, component activation recap. +3. **Reward Component Analysis** - Binned relationships (idle, hold, exit), correlation matrix (constant features removed), PBRS analysis (activation rates, component stats, invariance summary). +4. **Feature Importance** - Random Forest importance + partial dependence. +5. **Statistical Validation** - Hypothesis tests, bootstrap confidence intervals, normality diagnostics, optional distribution shift (5.4) when real episodes provided. +**Summary** - 7-point concise synthesis: +1. Reward distribution health (center, spread, tail asymmetry) +2. Action & position coverage (usage %, invalid rate, masking efficacy) +3. Component contributions (activation rates + mean / |mean| ranking) +4. Exit attenuation behavior (mode, continuity, effective decay characteristics) +5. Feature signal quality (model R², leading predictors, stability notes) +6. Statistical outcomes (significant correlations / tests, any multiple-testing adjustment applied, distribution shift if real data) +7. PBRS invariance verdict (|Σ shaping| < 1e-6 => canonical; otherwise non-canonical with absolute deviation) ### Data Exports @@ -451,41 +477,40 @@ The analysis generates the following output files: | `reward_samples.csv` | Raw synthetic samples for custom analysis | | `feature_importance.csv` | Feature importance rankings from random forest model | | `partial_dependence_*.csv` | Partial dependence data for key features | -| `manifest.json` | Run metadata (seed, params, top features, overrides) | +| `manifest.json` | Runtime manifest (simulation + reward params + hash) | ### Manifest Structure (`manifest.json`) -Key fields: - -| Field | Description | -|-------|-------------| -| `generated_at` | ISO timestamp of run | -| `num_samples` | Number of synthetic samples generated | -| `seed` | Random seed used (deterministic cascade) | -| `profit_target_effective` | Profit target after risk/reward scaling | -| `top_features` | Top 5 features by permutation importance | -| `reward_param_overrides` | Subset of reward tunables explicitly supplied via CLI | -| `params_hash` | SHA-256 hash combining simulation params + overrides (reproducibility) | -| `params` | Echo of core simulation parameters (subset, for quick audit) | -| `parameter_adjustments` | Any automatic bound clamps applied by `validate_reward_parameters` | - -Use `params_hash` to verify reproducibility across runs; identical seeds + identical overrides ⇒ identical hash. +| Field | Type | Description | +|-------|------|-------------| +| `generated_at` | string (ISO 8601) | Timestamp of generation (not part of hash). | +| `num_samples` | int | Number of synthetic samples generated. | +| `seed` | int | Master random seed driving simulation determinism. | +| `max_trade_duration` | int | Max trade duration used to scale durations. | +| `profit_target_effective` | float | Profit target after risk/reward scaling. | +| `pvalue_adjust_method` | string | Multiple testing correction mode (`none` or `benjamini_hochberg`). | +| `parameter_adjustments` | object | Map of any automatic bound clamps (empty if none). | +| `reward_params` | object | Full resolved reward parameter set (post-validation). | +| `simulation_params` | object | All simulation inputs (num_samples, seed, volatility knobs, etc.). | +| `params_hash` | string (sha256) | Hash over ALL `simulation_params` + ALL `reward_params` (lexicographically ordered). | + +Reproducibility: two runs are input-identical iff their `params_hash` values match. Because defaults are included in the hash, modifying a default value (even if not overridden) changes the hash. ### Distribution Shift Metric Conventions | Metric | Definition | Notes | |--------|------------|-------| | `*_kl_divergence` | KL(synthetic‖real) = Σ p_synth log(p_synth / p_real) | Asymmetric; 0 iff identical histograms (after binning). | -| `*_js_distance` | √(JS(p_synth, p_real)) | Symmetric, bounded [0,1]; distance form (sqrt of JS divergence). | +| `*_js_distance` | d_JS(p_synth, p_real) = √( 0.5 KL(p_synth‖m) + 0.5 KL(p_real‖m) ), m = 0.5 (p_synth + p_real) | Symmetric, bounded [0,1]; square-root of JS divergence; stable vs KL when supports differ. | | `*_wasserstein` | 1D Earth Mover's Distance | Non-negative; same units as feature. | | `*_ks_statistic` | KS two-sample statistic | ∈ [0,1]; higher = greater divergence. | | `*_ks_pvalue` | KS test p-value | ∈ [0,1]; small ⇒ reject equality (at α). | Implementation details: - Histograms: 50 uniform bins spanning min/max across both samples. -- Probabilities: counts + ε (1e‑10) then normalized ⇒ avoids log(0) and division by zero. -- Degenerate (constant) distributions short‑circuit to zeros (divergences) / p-value 1.0. -- JS distance is reported (not raw divergence) for bounded interpretability. +- Probabilities: counts + ε (1e-10) then normalized ⇒ avoids log(0) and division by zero. +- Degenerate distributions short-circuit to zeros / p-value 1.0. +- JS distance instead of raw JS divergence for bounded interpretability and smooth interpolation. --- @@ -531,7 +556,7 @@ python reward_space_analysis.py \ --output real_vs_synthetic ``` -The report will include distribution shift metrics (KL divergence ≥ 0, JS distance ∈ [0,1], Wasserstein ≥ 0, KS statistic ∈ [0,1], KS p‑value ∈ [0,1]) showing how well synthetic samples represent real trading. Degenerate (constant) distributions are auto‑detected and produce zero divergence and KS p‑value = 1.0 to avoid spurious instability. +The report will include distribution shift metrics (KL divergence ≥ 0, JS distance ∈ [0,1], Wasserstein ≥ 0, KS statistic ∈ [0,1], KS p-value ∈ [0,1]) showing how well synthetic samples represent real trading. Degenerate (constant) distributions are auto-detected and produce zero divergence and KS p-value = 1.0 to avoid spurious instability. ### Batch Analysis @@ -571,11 +596,13 @@ Always run the full suite after modifying reward logic or attenuation parameters | Statistical Validation | TestStatisticalValidation | Mathematical bounds, heteroscedasticity, invariants | | Boundary Conditions | TestBoundaryConditions | Extreme params & unknown mode fallback | | Helper Functions | TestHelperFunctions | Report writers, model analysis, utility conversions | -| Private Functions (via public API) | TestPrivateFunctions | Idle / hold / invalid penalties, exit scenarios | +| Private Functions | TestPrivateFunctions | Idle / hold / invalid penalties, exit scenarios (accessed indirectly) | | Robustness | TestRewardRobustness | Monotonic attenuation (where applicable), decomposition integrity, boundary regimes | | Parameter Validation | TestParameterValidation | Bounds clamping, warning threshold, penalty power scaling | -| Continuity | TestContinuityPlateau | Plateau boundary continuity & small‑epsilon attenuation scaling | +| Continuity | TestContinuityPlateau | Plateau boundary continuity & small-epsilon attenuation scaling | | PBRS Integration | TestPBRSIntegration | Potential-based reward shaping, transforms, exit modes, canonical invariance | +| Report Formatting | TestReportFormatting | Report section presence, ordering, PBRS invariance line, formatting integrity | +| Load Real Episodes | TestLoadRealEpisodes | Real episodes ingestion, column validation, distribution shift preparation | ### Test Architecture @@ -654,6 +681,8 @@ pytest -q test_reward_space_analysis.py::TestRewardAlignment - Use `--trading_mode spot` (fewer action combinations) - Close other memory-intensive applications - Use SSD storage for faster I/O +- Use `--skip_partial_dependence` to skip marginal response curves +- Temporarily lower `--bootstrap_resamples` (e.g. 1000) during iteration (expect wider CIs) ### Memory Errors diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index a733a05..c7752c8 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -58,6 +58,35 @@ class Positions(Enum): Neutral = 0.5 +# Tolerance for PBRS invariance classification (canonical if |Σ shaping| < PBRS_INVARIANCE_TOL) +PBRS_INVARIANCE_TOL: float = 1e-6 +# Default discount factor γ for potential-based reward shaping +POTENTIAL_GAMMA_DEFAULT: float = 0.95 + +# Attenuation mode sets (centralized for tests and validation) +ATTENUATION_MODES: Tuple[str, ...] = ("sqrt", "linear", "power", "half_life") +ATTENUATION_MODES_WITH_LEGACY: Tuple[str, ...] = ATTENUATION_MODES + ("legacy",) + +# Centralized internal numeric guards & behavior toggles (single source of truth for internal tunables) +INTERNAL_GUARDS: dict[str, float] = { + "degenerate_ci_epsilon": 1e-9, + "distribution_constant_fallback_moment": 0.0, + "distribution_constant_fallback_qq_r2": 1.0, + "moment_extreme_threshold": 1e4, + "bootstrap_min_recommended": 200, +} + + +class RewardDiagnosticsWarning(RuntimeWarning): + """Warning category for reward space diagnostic graceful fallbacks. + + Enables selective filtering in tests or CLI harness while leaving other + runtime warnings unaffected. + """ + + pass + + def _to_bool(value: Any) -> bool: if isinstance(value, bool): return value @@ -130,14 +159,11 @@ RewardParams = Dict[str, RewardParamValue] # Internal safe fallback helper for numeric failures (centralizes semantics) def _fail_safely(reason: str) -> float: """Return 0.0 on recoverable numeric failure (reason available for future debug hooks).""" - # NOTE: presently silent to preserve legacy behaviour; hook logging here if needed. + # NOTE: presently silent to preserve legacy behavior; hook logging here if needed. _ = reason return 0.0 -# Allowed exit attenuation modes -ALLOWED_EXIT_MODES = {"legacy", "sqrt", "linear", "power", "half_life"} - # PBRS constants ALLOWED_TRANSFORMS = { "tanh", @@ -185,7 +211,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS: RewardParams = { # === PBRS PARAMETERS === # Potential-based reward shaping core parameters # Discount factor γ for potential term (0 ≤ γ ≤ 1) - "potential_gamma": 0.95, + "potential_gamma": POTENTIAL_GAMMA_DEFAULT, "potential_softsign_sharpness": 1.0, # Exit potential modes: canonical | progressive_release | spike_cancel | retain_previous "exit_potential_mode": "canonical", @@ -368,7 +394,7 @@ def validate_reward_parameters( def _normalize_and_validate_mode(params: RewardParams) -> None: """Align normalization of ``exit_attenuation_mode`` with ReforceXY environment. - Behaviour (mirrors in-env logic): + Behavior (mirrors in-env logic): - Do not force lowercase or strip user formatting; use the value as provided. - Supported modes (case-sensitive): {legacy, sqrt, linear, power, half_life}. - If the value is not among supported keys, silently fallback to 'linear' @@ -380,7 +406,7 @@ def _normalize_and_validate_mode(params: RewardParams) -> None: return exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode", "linear") - if exit_attenuation_mode not in ALLOWED_EXIT_MODES: + if exit_attenuation_mode not in ATTENUATION_MODES_WITH_LEGACY: params["exit_attenuation_mode"] = "linear" @@ -404,7 +430,7 @@ def add_tunable_cli_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( f"--{key}", type=str, - choices=sorted(ALLOWED_EXIT_MODES), + choices=sorted(ATTENUATION_MODES_WITH_LEGACY), default=None, help=help_text, ) @@ -495,21 +521,50 @@ def _get_exit_factor( duration_ratio: float, params: RewardParams, ) -> float: - """Compute exit factor = time attenuation kernel (with optional plateau) * ``pnl_factor``. + """Compute exit factor controlling time attenuation of exit reward. + + Purpose + ------- + Produces a multiplicative factor applied to raw PnL at exit: + exit_reward = pnl * exit_factor + where: + exit_factor = time_kernel(base_factor, effective_duration_ratio) * pnl_factor - Parity: mirrors the live environment's logic (``ReforceXY._get_exit_factor``). + Parity + ------ + Mirrors environment method ``ReforceXY._get_exit_factor`` for offline / synthetic analysis. + + Algorithm + --------- + 1. Validate finiteness & clamp negative duration to 0. + 2. Apply optional plateau: effective_dr = 0 while duration_ratio <= grace else (dr - grace). + 3. Select kernel: legacy | sqrt | linear | power | half_life (all monotonic except legacy discontinuity at dr=1). + 4. Multiply by externally supplied ``pnl_factor`` (profit & efficiency modulation). + 5. Invariants & safety: non-finite -> 0; prevent negative factor on non-negative pnl; warn on large magnitude. - Assumptions: - - ``_normalize_and_validate_mode`` has already run (invalid modes replaced by 'linear'). - - ``exit_attenuation_mode`` is therefore either a member of ``ALLOWED_EXIT_MODES`` or 'linear'. - - All numeric tunables are accessed through ``_get_float_param`` for safety. + Parameters + ---------- + base_factor : float + Base scaling constant before temporal attenuation. + pnl : float + Realized (or unrealized at exit decision) profit/loss. + pnl_factor : float + PnL modulation factor (win amplification + efficiency) computed separately. + duration_ratio : float + trade_duration / max_trade_duration (pre-clamped upstream). + params : dict + Reward parameter mapping. - Algorithm steps: - 1. Finiteness & non-negative guard on inputs. - 2. Plateau handling: effective duration ratio = 0 within grace region else (r - grace). - 3. Kernel application (legacy|sqrt|linear|power|half_life). - 4. Multiply by externally supplied ``pnl_factor`` (already includes profit & efficiency effects). - 5. Invariants: ensure finiteness; clamp negative factor when pnl >= 0; emit threshold warning. + Returns + ------- + float + Exit factor (>=0 unless pnl < 0 and invariants disabled). + + Notes + ----- + - Legacy kernel is discontinuous and maintained for backward compatibility only. + - Combine with ``_get_pnl_factor`` for full exit reward shaping (non-PBRS, path dependent). + - Plateau introduces a derivative kink at the grace boundary. """ # Basic finiteness checks if ( @@ -577,6 +632,14 @@ def _get_exit_factor( kernel = kernels.get(exit_attenuation_mode, None) if kernel is None: + warnings.warn( + ( + f"Unknown exit_attenuation_mode '{exit_attenuation_mode}'; defaulting to 'linear' " + f"(effective_dr={effective_dr:.5f})" + ), + RuntimeWarning, + stacklevel=2, + ) kernel = _linear_kernel try: @@ -618,30 +681,87 @@ def _get_exit_factor( def _get_pnl_factor( - params: RewardParams, context: RewardContext, profit_target: float + params: RewardParams, + context: RewardContext, + profit_target: float, + risk_reward_ratio: float, ) -> float: - """Env-aligned PnL factor combining profit amplification and exit efficiency.""" - pnl = context.pnl + """Compute PnL amplification factor (profit target over/under performance + efficiency). - if not np.isfinite(pnl) or not np.isfinite(profit_target): - return _fail_safely("non_finite_pnl_or_target") + Purpose + ------- + Encapsulates profit overshoot bonus and controlled loss penalty plus an efficiency tilt + based on intra-trade utilization of observed profit range. - profit_target_factor = 1.0 - if profit_target > 0.0 and pnl > profit_target: - win_reward_factor = _get_float_param( - params, - "win_reward_factor", - DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0), - ) - pnl_factor_beta = _get_float_param( - params, - "pnl_factor_beta", - DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5), - ) - pnl_ratio = pnl / profit_target - profit_target_factor = 1.0 + win_reward_factor * math.tanh( - pnl_factor_beta * (pnl_ratio - 1.0) - ) + Parity + ------ + Mirrors environment method ``MyRLEnv._get_pnl_factor``. + + Algorithm + --------- + 1. Compute pnl_ratio = pnl / profit_target (profit_target already includes RR upstream). + 2. If |pnl_ratio| <= 1: pnl_target_factor = 1.0. + 3. Else compute base = tanh(beta * (|pnl_ratio| - 1)). + a. Gain branch (pnl_ratio > 1): 1 + win_reward_factor * base + b. Loss branch (pnl_ratio < -1/rr): 1 + (win_reward_factor * rr) * base + 4. Efficiency: derive efficiency_ratio within [0,1] from intra-trade min/max; add linear tilt + around efficiency_center scaled by efficiency_weight (sign-flipped for losses). + 5. Return max(0, pnl_target_factor * efficiency_factor). + + Parameters + ---------- + params : dict + Reward parameter mapping. + context : RewardContext + Current PnL and intra-trade extrema. + profit_target : float + Profit objective (already RR-scaled when called from calculate_reward). + risk_reward_ratio : float + RR used to set asymmetric loss trigger threshold (pnl_ratio < -1/RR). + + Returns + ------- + float + Non-negative factor (0 if invalid inputs or degenerate target). + + Notes + ----- + - Symmetric tanh avoids unbounded amplification. + - Loss penalty magnitude scaled by RR to keep incentive structure consistent across setups. + - Efficiency tilt introduces path dependence (non-PBRS component). + """ + pnl = context.pnl + if ( + not np.isfinite(pnl) + or not np.isfinite(profit_target) + or not np.isfinite(risk_reward_ratio) + ): + return _fail_safely("non_finite_inputs_pnl_factor") + if profit_target <= 0.0: + return 0.0 + + win_reward_factor = _get_float_param( + params, + "win_reward_factor", + DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0), + ) + pnl_factor_beta = _get_float_param( + params, + "pnl_factor_beta", + DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5), + ) + rr = risk_reward_ratio if risk_reward_ratio > 0 else 1.0 + + pnl_ratio = pnl / profit_target + pnl_target_factor = 1.0 + if abs(pnl_ratio) > 1.0: + # Environment uses tanh(beta * (abs(ratio)-1)) for magnitude beyond target. + base_pnl_target_factor = math.tanh(pnl_factor_beta * (abs(pnl_ratio) - 1.0)) + if pnl_ratio > 1.0: + pnl_target_factor = 1.0 + win_reward_factor * base_pnl_target_factor + elif pnl_ratio < -(1.0 / rr): + loss_penalty_factor = win_reward_factor * rr + pnl_target_factor = 1.0 + loss_penalty_factor * base_pnl_target_factor efficiency_factor = 1.0 efficiency_weight = _get_float_param( @@ -669,7 +789,7 @@ def _get_pnl_factor( efficiency_center - efficiency_ratio ) - return max(0.0, profit_target_factor * efficiency_factor) + return max(0.0, pnl_target_factor * efficiency_factor) def _is_valid_action( @@ -694,7 +814,7 @@ def _is_valid_action( def _idle_penalty( context: RewardContext, idle_factor: float, params: RewardParams ) -> float: - """Mirror the environment's idle penalty behaviour.""" + """Mirror the environment's idle penalty behavior.""" idle_penalty_scale = _get_float_param( params, "idle_penalty_scale", @@ -734,7 +854,7 @@ def _idle_penalty( def _hold_penalty( context: RewardContext, hold_factor: float, params: RewardParams ) -> float: - """Mirror the environment's hold penalty behaviour.""" + """Mirror the environment's hold penalty behavior.""" hold_penalty_scale = _get_float_param( params, "hold_penalty_scale", @@ -806,11 +926,14 @@ def calculate_reward( params, "risk_reward_ratio", float(risk_reward_ratio) ) - # Scale profit target by risk-reward ratio (reward multiplier) - # E.g., profit_target=0.03, RR=2.0 → profit_target_final=0.06 profit_target_final = profit_target * risk_reward_ratio idle_factor = factor * profit_target_final / 4.0 - pnl_factor = _get_pnl_factor(params, context, profit_target_final) + pnl_factor = _get_pnl_factor( + params, + context, + profit_target_final, + risk_reward_ratio, + ) hold_factor = idle_factor # Base reward calculation (existing logic) @@ -1191,6 +1314,37 @@ def _binned_stats( target: str, bins: Iterable[float], ) -> pd.DataFrame: + """Compute aggregated statistics of a target variable across value bins. + + Purpose + ------- + Provide consistent binned descriptive statistics (count, mean, std, min, max) + for exploratory diagnostics of reward component relationships. + + Parameters + ---------- + df : pd.DataFrame + Source dataframe containing at minimum the ``column`` and ``target`` fields. + column : str + Name of the column whose values are used to create bins. + target : str + Column whose statistics are aggregated per bin. + bins : Iterable[float] + Monotonic sequence of bin edges (len >= 2). Values outside the range are + clipped to the boundary edges prior to bin assignment. + + Returns + ------- + pd.DataFrame + Dataframe indexed by stringified interval with columns: + ``count``, ``mean``, ``std``, ``min``, ``max``. + + Notes + ----- + - Duplicate bin edges are dropped via pandas ``cut(duplicates='drop')``. + - Non-finite or missing bins after clipping are excluded prior to grouping. + - This helper is deterministic and side-effect free. + """ bins_arr = np.asarray(list(bins), dtype=float) if bins_arr.ndim != 1 or bins_arr.size < 2: raise ValueError("bins must contain at least two edges") @@ -1214,7 +1368,34 @@ def _binned_stats( def _compute_relationship_stats( df: pd.DataFrame, max_trade_duration: int ) -> Dict[str, Any]: - """Compute relationship statistics without writing to file.""" + """Compute binned relationship statistics among core reward drivers. + + Purpose + ------- + Generate uniformly binned summaries for idle duration, trade duration and + realized PnL to facilitate downstream comparative or visual analyses. + + Parameters + ---------- + df : pd.DataFrame + Input dataframe containing ``idle_duration``, ``trade_duration``, ``pnl`` and + corresponding reward component columns (``reward_idle``, ``reward_hold``, + ``reward_exit``). + max_trade_duration : int + Maximum configured trade duration used to scale bin ranges. + + Returns + ------- + Dict[str, Any] + Dictionary with keys ``idle_stats``, ``hold_stats`` and ``exit_stats`` each + containing a binned statistics dataframe. + + Notes + ----- + - PnL bin upper bound is adjusted by a tiny epsilon when min ≈ max to avoid + degenerate intervals. + - All statistics are rounded to 6 decimal places for compactness. + """ idle_bins = np.linspace(0, max_trade_duration * 3.0, 13) trade_bins = np.linspace(0, max_trade_duration * 3.0, 13) pnl_min = float(df["pnl"].min()) @@ -1241,13 +1422,23 @@ def _compute_relationship_stats( "trade_duration", "idle_duration", ] - correlation = df[correlation_fields].corr().round(4) + # Drop columns that are constant (std == 0) to avoid all-NaN correlation rows + numeric_subset = df[correlation_fields] + constant_cols = [ + c for c in numeric_subset.columns if numeric_subset[c].nunique() <= 1 + ] + if constant_cols: + filtered = numeric_subset.drop(columns=constant_cols) + else: + filtered = numeric_subset + correlation = filtered.corr().round(4) return { "idle_stats": idle_stats, "hold_stats": hold_stats, "exit_stats": exit_stats, "correlation": correlation, + "correlation_dropped": constant_cols, } @@ -1297,7 +1488,7 @@ def _compute_representativity_stats( def _perform_feature_analysis( - df: pd.DataFrame, seed: int + df: pd.DataFrame, seed: int, *, skip_partial_dependence: bool = False ) -> Tuple[ pd.DataFrame, Dict[str, Any], Dict[str, pd.DataFrame], RandomForestRegressor ]: @@ -1327,7 +1518,7 @@ def _perform_feature_analysis( X = df[feature_cols] for col in ("trade_duration", "idle_duration"): if col in X.columns and pd.api.types.is_integer_dtype(X[col]): - X[col] = X[col].astype(float) + X.loc[:, col] = X[col].astype(float) y = df["reward_total"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.25, random_state=seed @@ -1365,22 +1556,23 @@ def _perform_feature_analysis( .reset_index(drop=True) ) - # Compute partial dependence for key features + # Compute partial dependence for key features unless skipped partial_deps = {} - for feature in ["trade_duration", "idle_duration", "pnl"]: - pd_result = partial_dependence( - model, - X_test, - [feature], - grid_resolution=50, - kind="average", - ) - value_key = "values" if "values" in pd_result else "grid_values" - values = pd_result[value_key][0] - averaged = pd_result["average"][0] - partial_deps[feature] = pd.DataFrame( - {feature: values, "partial_dependence": averaged} - ) + if not skip_partial_dependence: + for feature in ["trade_duration", "idle_duration", "pnl"]: + pd_result = partial_dependence( + model, + X_test, + [feature], + grid_resolution=50, + kind="average", + ) + value_key = "values" if "values" in pd_result else "grid_values" + values = pd_result[value_key][0] + averaged = pd_result["average"][0] + partial_deps[feature] = pd.DataFrame( + {feature: values, "partial_dependence": averaged} + ) analysis_stats = { "r2_score": r2, @@ -1395,24 +1587,47 @@ def _perform_feature_analysis( def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFrame: - """Load transitions from a pickle into a pandas.DataFrame. - - Accepted inputs: a pickled DataFrame, a list of transition dicts, a list of - episode dicts each containing a 'transitions' iterable, or a dict with key - 'transitions'. + """Load serialized episodes / transitions into a normalized DataFrame. + + Accepted payload topologies (pickled): + - DataFrame directly. + - Dict with key ``'transitions'`` whose value is: + * a DataFrame OR + * an iterable of transition dicts. + - List of episode dicts where each dict with key ``'transitions'`` provides: + * a DataFrame OR + * an iterable of transition dicts. + - Fallback: any object convertible to ``pd.DataFrame`` (best‑effort). + + Normalization steps: + 1. Flatten nested episode structures. + 2. Coerce numeric columns (expected + optional) with safe NaN coercion tracking. + 3. Ensure required columns exist (raise or fill with NaN depending on ``enforce_columns``). + 4. Add missing optional numeric columns (set to NaN) to obtain a stable schema. + 5. (Light) Deduplication: drop exact duplicate transition rows if any. + + Security NOTE: + Pickle loading executes arbitrary code if the file is malicious. Only load + trusted artifacts. This helper assumes the calling context enforces trust. Parameters ---------- - path: Path - Path to the pickle file. - enforce_columns: bool - If True require required columns, else fill missing with NaN and warn. + path : Path + Pickle file path. + enforce_columns : bool, default True + If True, missing required columns cause a ValueError; else they are created + and filled with NaN and a warning is emitted. + + Returns + ------- + pd.DataFrame + Normalized transition frame including (at minimum) required columns. Raises ------ ValueError - On unpickle failure or when the payload cannot be converted to a valid - transitions DataFrame (and enforce_columns is True). + On unpickle failure, structure mismatch (when ``enforce_columns`` is True), + or irrecoverable conversion errors. """ try: @@ -1490,6 +1705,7 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr "reward_total", } + # Keep optional list stable and explicit numeric_optional = { "reward_exit", "reward_idle", @@ -1499,6 +1715,12 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr "idle_ratio", "max_unrealized_profit", "min_unrealized_profit", + # Additive / shaping components + "reward_entry_additive", + "reward_exit_additive", + "current_potential", + "next_potential", + "is_invalid", } for col in list(numeric_expected | numeric_optional): @@ -1526,21 +1748,35 @@ def load_real_episodes(path: Path, *, enforce_columns: bool = True) -> pd.DataFr "action", "reward_total", } - missing = required - set(df.columns) - if missing: + missing_required = required - set(df.columns) + if missing_required: if enforce_columns: raise ValueError( - f"Loaded episodes data is missing required columns: {sorted(missing)}. " + f"Loaded episodes data is missing required columns: {sorted(missing_required)}. " f"Found columns: {sorted(list(df.columns))}." ) - else: - warnings.warn( - f"Loaded episodes data is missing columns {sorted(missing)}; filling with NaN (enforce_columns=False)", - RuntimeWarning, - stacklevel=2, - ) - for col in missing: - df[col] = np.nan + warnings.warn( + f"Loaded episodes data is missing columns {sorted(missing_required)}; filling with NaN (enforce_columns=False)", + RuntimeWarning, + stacklevel=2, + ) + for col in missing_required: + df[col] = np.nan + + # Ensure all optional numeric columns exist for schema stability + for opt_col in numeric_optional: + if opt_col not in df.columns: + df[opt_col] = np.nan + + # Drop exact duplicates (rare but can appear after flattening) + before_dupes = len(df) + df = df.drop_duplicates() + if len(df) != before_dupes: + warnings.warn( + f"Removed {before_dupes - len(df)} duplicate transition row(s) while loading '{path}'.", + RuntimeWarning, + stacklevel=2, + ) return df @@ -1830,14 +2066,67 @@ def bootstrap_confidence_intervals( n_bootstrap: int = 10000, confidence_level: float = 0.95, seed: int = 42, + *, + strict_diagnostics: bool = False, ) -> Dict[str, Tuple[float, float, float]]: - """Bootstrap confidence intervals for key metrics.""" + """Estimate confidence intervals for mean of selected metrics via bootstrap. + + Graceful mode policy (``strict_diagnostics=False``): + - If the computed CI has zero or negative width (including inverted bounds), + it is automatically widened symmetrically around the sample mean by an + epsilon (``INTERNAL_GUARDS['degenerate_ci_epsilon']``) so the reporting + pipeline is not interrupted. A ``RewardDiagnosticsWarning`` is emitted to + make the adjustment transparent. + Strict mode policy (``strict_diagnostics=True``): + - The same anomalous condition triggers an immediate ``AssertionError`` + (fail-fast) to surface upstream causes such as a constant column or a + prior data sanitization issue. + Rationale: This dual mode avoids silently masking structural problems while + still supporting uninterrupted exploratory / CLI smoke runs. + + Purpose + ------- + Provide non-parametric uncertainty estimates for the mean of reward-related + metrics, robust to unknown or asymmetric distributions. + + Parameters + ---------- + df : pd.DataFrame + Source dataframe containing metric columns. + metrics : List[str] + Names of numeric columns to evaluate; silently skipped if absent. + n_bootstrap : int, default 10000 + Number of bootstrap resamples (with replacement). + confidence_level : float, default 0.95 + Nominal coverage probability for the two-sided interval. + seed : int, default 42 + Seed for local reproducible RNG (does not mutate global state). + + Returns + ------- + Dict[str, Tuple[float, float, float]] + Mapping metric -> (mean_estimate, ci_lower, ci_upper). + + Notes + ----- + - Metrics with < 10 non-null observations are skipped to avoid unstable CIs. + - Percentile method used; no bias correction or acceleration applied. + - Validation enforces finite, ordered, positive-width intervals. + """ alpha = 1 - confidence_level lower_percentile = 100 * alpha / 2 upper_percentile = 100 * (1 - alpha / 2) results = {} + # Advisory: very low bootstrap counts produce unstable CI widths + min_rec = int(INTERNAL_GUARDS.get("bootstrap_min_recommended", 200)) + if n_bootstrap < min_rec: + warnings.warn( + f"n_bootstrap={n_bootstrap} < recommended minimum {min_rec}; confidence intervals may be unstable", + RewardDiagnosticsWarning, + ) + # Local RNG to avoid mutating global NumPy RNG state rng = np.random.default_rng(seed) @@ -1864,13 +2153,34 @@ def bootstrap_confidence_intervals( results[metric] = (point_est, ci_low, ci_high) # Validate bootstrap confidence intervals - _validate_bootstrap_results(results) + _validate_bootstrap_results(results, strict_diagnostics=strict_diagnostics) return results -def _validate_bootstrap_results(results: Dict[str, Tuple[float, float, float]]) -> None: - """Validate mathematical properties of bootstrap confidence intervals.""" +def _validate_bootstrap_results( + results: Dict[str, Tuple[float, float, float]], *, strict_diagnostics: bool +) -> None: + """Validate structural and numerical integrity of bootstrap CI outputs. + + Purpose + ------- + Fail fast if any generated confidence interval violates expected invariants. + + Parameters + ---------- + results : Dict[str, Tuple[float, float, float]] + Mapping from metric name to (mean, ci_low, ci_high). + + Returns + ------- + None + + Notes + ----- + - Raises AssertionError on first violation (finite bounds, ordering, width > 0). + - Intentionally internal; external callers should rely on exceptions for flow. + """ for metric, (mean, ci_low, ci_high) in results.items(): # CI bounds must be finite if not (np.isfinite(mean) and np.isfinite(ci_low) and np.isfinite(ci_high)): @@ -1889,22 +2199,61 @@ def _validate_bootstrap_results(results: Dict[str, Tuple[float, float, float]]) # CI width should be positive (non-degenerate) width = ci_high - ci_low if width <= 0: - raise AssertionError( - f"Bootstrap CI for {metric}: non-positive width {width:.6f}" + if strict_diagnostics: + raise AssertionError( + f"Bootstrap CI for {metric}: non-positive width {width:.6f}" + ) + # Graceful mode: expand interval symmetrically + epsilon = ( + INTERNAL_GUARDS.get("degenerate_ci_epsilon", 1e-9) + if width == 0 + else abs(width) * 1e-6 + ) + center = mean + # Adjust only if current bounds are identical; otherwise enforce ordering minimally. + if ci_low == ci_high: + ci_low = center - epsilon + ci_high = center + epsilon + else: + # Ensure proper ordering if inverted or collapsed negatively. + lower = min(ci_low, ci_high) - epsilon + upper = max(ci_low, ci_high) + epsilon + ci_low, ci_high = lower, upper + results[metric] = (mean, ci_low, ci_high) + warnings.warn( + f"Degenerate bootstrap CI for {metric} adjusted to maintain positive width;" + f" original width={width:.6e}, epsilon={epsilon:.1e}", + RewardDiagnosticsWarning, ) def distribution_diagnostics( - df: pd.DataFrame, *, seed: int | None = None + df: pd.DataFrame, *, seed: int | None = None, strict_diagnostics: bool = False ) -> Dict[str, Any]: - """Distribution diagnostics: normality tests, skewness, kurtosis. + """Compute distributional diagnostics for selected numeric columns. + + Purpose + ------- + Aggregate normality test statistics and moment-based shape descriptors to + support assessment of modelling assumptions or transformation needs. Parameters ---------- df : pd.DataFrame - Input samples. + Dataframe containing relevant numeric columns. seed : int | None, optional - Reserved for future stochastic diagnostic extensions; currently unused. + Reserved for potential future stochastic extensions; unused presently. + + Returns + ------- + Dict[str, Any] + Mapping of column name -> diagnostic results (tests, moments, p-values). + + Notes + ----- + - Skips columns absent from the dataframe. + - Applies Shapiro-Wilk for n <= 5000 else D'Agostino's K2 due to cost. + - All numeric outputs are floats; non-finite intermediate results are ignored. """ diagnostics = {} _ = seed # placeholder to keep signature for future reproducibility extensions @@ -1919,8 +2268,16 @@ def distribution_diagnostics( diagnostics[f"{col}_mean"] = float(np.mean(data)) diagnostics[f"{col}_std"] = float(np.std(data, ddof=1)) - diagnostics[f"{col}_skewness"] = float(stats.skew(data)) - diagnostics[f"{col}_kurtosis"] = float(stats.kurtosis(data, fisher=True)) + skew_v = float(stats.skew(data)) + kurt_v = float(stats.kurtosis(data, fisher=True)) + diagnostics[f"{col}_skewness"] = skew_v + diagnostics[f"{col}_kurtosis"] = kurt_v + thr = INTERNAL_GUARDS.get("moment_extreme_threshold", 1e4) + if abs(skew_v) > thr or abs(kurt_v) > thr: + msg = f"Extreme moment(s) for {col}: skew={skew_v:.3e}, kurtosis={kurt_v:.3e} exceeds threshold {thr}." + if strict_diagnostics: + raise AssertionError(msg) + warnings.warn(msg, RewardDiagnosticsWarning) if len(data) < 5000: sw_stat, sw_pval = stats.shapiro(data) @@ -1942,23 +2299,50 @@ def distribution_diagnostics( (_osm, _osr), (_slope, _intercept, r) = probplot(data, dist="norm", plot=None) diagnostics[f"{col}_qq_r_squared"] = float(r**2) - _validate_distribution_diagnostics(diagnostics) + _validate_distribution_diagnostics( + diagnostics, strict_diagnostics=strict_diagnostics + ) return diagnostics -def _validate_distribution_diagnostics(diag: Dict[str, Any]) -> None: +def _validate_distribution_diagnostics( + diag: Dict[str, Any], *, strict_diagnostics: bool +) -> None: """Validate mathematical properties of distribution diagnostics. Ensures all reported statistics are finite and within theoretical bounds where applicable. Invoked automatically inside distribution_diagnostics(); raising AssertionError on violation enforces fail-fast semantics consistent with other validation helpers. """ - for key, value in diag.items(): + # Pre-compute zero-variance flags to allow graceful handling of undefined higher moments. + zero_var_columns = set() + for k, v in diag.items(): + if k.endswith("_std") and (not np.isfinite(v) or v == 0): + prefix = k[: -len("_std")] + zero_var_columns.add(prefix) + + for key, value in list(diag.items()): if any(suffix in key for suffix in ["_mean", "_std", "_skewness", "_kurtosis"]): if not np.isfinite(value): - raise AssertionError( - f"Distribution diagnostic {key} is not finite: {value}" + # Graceful degradation for constant distributions: skewness/kurtosis become NaN. + constant_problem = any( + key.startswith(prefix) + and (key.endswith("_skewness") or key.endswith("_kurtosis")) + for prefix in zero_var_columns ) + if constant_problem and not strict_diagnostics: + fallback = INTERNAL_GUARDS.get( + "distribution_constant_fallback_moment", 0.0 + ) + diag[key] = fallback + warnings.warn( + f"Replaced undefined {key} (constant distribution) with {fallback}", + RewardDiagnosticsWarning, + ) + else: + raise AssertionError( + f"Distribution diagnostic {key} is not finite: {value}" + ) if key.endswith("_shapiro_pval"): if not (0 <= value <= 1): raise AssertionError( @@ -1966,12 +2350,38 @@ def _validate_distribution_diagnostics(diag: Dict[str, Any]) -> None: ) if key.endswith("_anderson_stat") or key.endswith("_anderson_critical_5pct"): if not np.isfinite(value): + prefix = key.rsplit("_", 2)[0] + if prefix in zero_var_columns and not strict_diagnostics: + fallback = INTERNAL_GUARDS.get( + "distribution_constant_fallback_moment", 0.0 + ) + diag[key] = fallback + warnings.warn( + f"Replaced undefined Anderson diagnostic {key} (constant distribution) with {fallback}", + RewardDiagnosticsWarning, + ) + continue raise AssertionError( f"Anderson statistic {key} must be finite, got {value}" ) if key.endswith("_qq_r_squared"): - if not (0 <= value <= 1): - raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}") + if not ( + isinstance(value, (int, float)) + and np.isfinite(value) + and 0 <= value <= 1 + ): + prefix = key[: -len("_qq_r_squared")] + if prefix in zero_var_columns and not strict_diagnostics: + fallback_r2 = INTERNAL_GUARDS.get( + "distribution_constant_fallback_qq_r2", 1.0 + ) + diag[key] = fallback_r2 + warnings.warn( + f"Replaced undefined Q-Q R^2 {key} (constant distribution) with {fallback_r2}", + RewardDiagnosticsWarning, + ) + else: + raise AssertionError(f"Q-Q R^2 {key} must be in [0,1], got {value}") def build_argument_parser() -> argparse.ArgumentParser: @@ -1990,6 +2400,11 @@ def build_argument_parser() -> argparse.ArgumentParser: default=42, help="Random seed for reproducibility (default: 42).", ) + parser.add_argument( + "--skip_partial_dependence", + action="store_true", + help="Skip partial dependence computation to speed up analysis.", + ) parser.add_argument( "--stats_seed", type=int, @@ -2080,6 +2495,24 @@ def build_argument_parser() -> argparse.ArgumentParser: default="none", help="Multiple testing correction method for hypothesis tests (default: none).", ) + parser.add_argument( + "--strict_diagnostics", + action="store_true", + help=( + "Enable fail-fast mode for statistical diagnostics: raise on zero-width bootstrap CIs or undefined " + "skewness/kurtosis/Anderson/Q-Q metrics produced by constant distributions instead of applying graceful replacements." + ), + ) + parser.add_argument( + "--bootstrap_resamples", + type=int, + default=10000, + metavar="N", + help=( + "Number of bootstrap resamples for confidence intervals (default: 10000). " + "Lower this (e.g. 200-1000) for faster smoke tests; increase for more stable CI width estimates." + ), + ) return parser @@ -2093,6 +2526,9 @@ def write_complete_statistical_analysis( *, adjust_method: str = "none", stats_seed: Optional[int] = None, + strict_diagnostics: bool = False, + bootstrap_resamples: int = 10000, + skip_partial_dependence: bool = False, ) -> None: """Generate a single comprehensive statistical analysis report with enhanced tests.""" output_dir.mkdir(parents=True, exist_ok=True) @@ -2145,20 +2581,21 @@ def write_complete_statistical_analysis( # Model analysis importance_df, analysis_stats, partial_deps, _model = _perform_feature_analysis( - df, seed + df, seed, skip_partial_dependence=skip_partial_dependence ) # Save feature importance CSV importance_df.to_csv(output_dir / "feature_importance.csv", index=False) # Save partial dependence CSVs - for feature, pd_df in partial_deps.items(): - pd_df.to_csv( - output_dir / f"partial_dependence_{feature}.csv", - index=False, - ) + if not skip_partial_dependence: + for feature, pd_df in partial_deps.items(): + pd_df.to_csv( + output_dir / f"partial_dependence_{feature}.csv", + index=False, + ) - # Enhanced statistics (always computed) + # Enhanced statistics test_seed = ( stats_seed if isinstance(stats_seed, int) @@ -2175,9 +2612,15 @@ def write_complete_statistical_analysis( "pnl", ] bootstrap_ci = bootstrap_confidence_intervals( - df, metrics_for_ci, n_bootstrap=10000, seed=test_seed + df, + metrics_for_ci, + n_bootstrap=int(bootstrap_resamples), + seed=test_seed, + strict_diagnostics=strict_diagnostics, + ) + dist_diagnostics = distribution_diagnostics( + df, seed=test_seed, strict_diagnostics=strict_diagnostics ) - dist_diagnostics = distribution_diagnostics(df, seed=test_seed) distribution_shift = None if real_df is not None: @@ -2194,7 +2637,41 @@ def write_complete_statistical_analysis( f.write(f"| Total Samples | {len(df):,} |\n") f.write(f"| Random Seed | {seed} |\n") f.write(f"| Max Trade Duration | {max_trade_duration} |\n") - f.write(f"| Profit Target (effective) | {profit_target:.4f} |\n\n") + # Blank separator to visually group core simulation vs PBRS parameters + f.write("| | |\n") + # Extra core PBRS parameters exposed in run configuration if present + _rp = ( + df.attrs.get("reward_params") + if isinstance(df.attrs.get("reward_params"), dict) + else {} + ) + exit_mode = _rp.get("exit_potential_mode", "canonical") + potential_gamma = _rp.get( + "potential_gamma", + DEFAULT_MODEL_REWARD_PARAMETERS.get( + "potential_gamma", POTENTIAL_GAMMA_DEFAULT + ), + ) + f.write(f"| exit_potential_mode | {exit_mode} |\n") + f.write(f"| potential_gamma | {potential_gamma} |\n") + # Blank separator before overrides block + f.write("| | |\n") + + overrides_pairs = [] + if _rp: + for k, default_v in DEFAULT_MODEL_REWARD_PARAMETERS.items(): + if k in ("exit_potential_mode", "potential_gamma"): + continue # already printed explicitly + try: + if k in _rp and _rp[k] != default_v: + overrides_pairs.append(f"{k}={_rp[k]}") + except Exception: + continue + if overrides_pairs: + f.write(f"| Overrides | {', '.join(sorted(overrides_pairs))} |\n") + else: + f.write("| Overrides | (none) |\n") + f.write("\n") f.write("---\n\n") @@ -2210,19 +2687,34 @@ def write_complete_statistical_analysis( f.write("### 1.2 Reward Statistics by Action\n\n") action_df = summary_stats["action_summary"].copy() + # Cast index to int for cleaner display (0,1,2,3,4) + action_df.index = action_df.index.astype(int) if action_df.index.name is None: action_df.index.name = "action" f.write(_df_to_md(action_df, index_name=action_df.index.name, ndigits=6)) f.write("### 1.3 Component Activation Rates\n\n") f.write("Percentage of samples where each reward component is non-zero:\n\n") - f.write( - _series_to_md( - summary_stats["component_share"], - value_name="activation_rate", - ndigits=6, - ) - ) + comp_share = summary_stats["component_share"].copy() + formatted_rows = [ + "| Component | Activation Rate |", + "|-----------|----------------|", + ] + # Enforce deterministic logical ordering of components if present + preferred_order = [ + "reward_invalid", + "reward_idle", + "reward_hold", + "reward_exit", + "reward_shaping", + "reward_entry_additive", + "reward_exit_additive", + ] + for comp in preferred_order: + if comp in comp_share.index: + val = comp_share.loc[comp] + formatted_rows.append(f"| {comp} | {val:.1%} |") + f.write("\n".join(formatted_rows) + "\n\n") f.write("### 1.4 Component Value Ranges\n\n") bounds_df = summary_stats["component_bounds"].copy() @@ -2269,7 +2761,7 @@ def write_complete_statistical_analysis( ) f.write("\n") - f.write("### 2.4 Component Activation Coverage\n\n") + f.write("### 2.4 Component Activation Rates\n\n") f.write("| Component | Activation Rate |\n") f.write("|-----------|----------------|\n") f.write(f"| Idle penalty | {representativity_stats['idle_activated']:.1%} |\n") @@ -2317,6 +2809,13 @@ def write_complete_statistical_analysis( if corr_df.index.name is None: corr_df.index.name = "feature" f.write(_df_to_md(corr_df, index_name=corr_df.index.name, ndigits=4)) + _dropped = relationship_stats.get("correlation_dropped") or [] + if _dropped: + f.write( + "\n_Constant features removed (no variance): " + + ", ".join(_dropped) + + "._\n\n" + ) # Section 3.5: PBRS Analysis f.write("### 3.5 PBRS (Potential-Based Reward Shaping) Analysis\n\n") @@ -2362,14 +2861,26 @@ def write_complete_statistical_analysis( # PBRS invariance check (canonical mode) total_shaping = df["reward_shaping"].sum() - if abs(total_shaping) < 1e-6: - f.write( - "✅ **PBRS Invariance:** Total shaping reward ≈ 0 (canonical mode preserved)\n\n" - ) + entry_add_total = df.get("reward_entry_additive", pd.Series([0])).sum() + exit_add_total = df.get("reward_exit_additive", pd.Series([0])).sum() + # Prepare invariance summary markdown block + if abs(total_shaping) < PBRS_INVARIANCE_TOL: + invariance_status = "✅ Canonical" + invariance_note = "Total shaping ≈ 0 (canonical invariance)" else: - f.write( - f"❌ **PBRS Invariance:** Total shaping reward = {total_shaping:.6f} (non-canonical behavior)\n\n" - ) + invariance_status = "❌ Non-canonical" + invariance_note = f"Total shaping = {total_shaping:.6f} (non-zero)" + + # Summarize PBRS invariance (added explicit section) + f.write("**PBRS Invariance Summary:**\n\n") + f.write("| Field | Value |\n") + f.write("|-------|-------|\n") + f.write(f"| Invariance | {invariance_status} |\n") + f.write(f"| Note | {invariance_note} |\n") + f.write(f"| Σ Shaping Reward | {total_shaping:.6f} |\n") + f.write(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n") + f.write(f"| Σ Entry Additive | {entry_add_total:.6f} |\n") + f.write(f"| Σ Exit Additive | {exit_add_total:.6f} |\n\n") else: f.write("_PBRS components not present in this analysis._\n\n") @@ -2396,7 +2907,12 @@ def write_complete_statistical_analysis( f.write(header + sep + "\n".join(rows) + "\n\n") f.write("**Exported Data:**\n") f.write("- Full feature importance: `feature_importance.csv`\n") - f.write("- Partial dependence plots: `partial_dependence_*.csv`\n\n") + if not skip_partial_dependence: + f.write("- Partial dependence plots: `partial_dependence_*.csv`\n\n") + else: + f.write( + "- Partial dependence plots: (skipped via --skip_partial_dependence)\n\n" + ) # Section 5: Statistical Validation if hypothesis_tests: @@ -2540,6 +3056,10 @@ def write_complete_statistical_analysis( f.write( "| KS p-value | > 0.05 | ✅ Yes: No significant difference |\n\n" ) + else: + # Placeholder keeps numbering stable and explicit + f.write("### 5.4 Distribution Shift Analysis\n\n") + f.write("_Not performed (no real episodes provided)._\n\n") # Footer f.write("---\n\n") @@ -2562,6 +3082,22 @@ def write_complete_statistical_analysis( ) if distribution_shift: f.write("6. **Distribution Shift** - Comparison with real trading data\n") + else: + f.write( + "6. **Distribution Shift** - Not performed (no real episodes provided)\n" + ) + if "reward_shaping" in df.columns: + _total_shaping = df["reward_shaping"].sum() + _canonical = abs(_total_shaping) < 1e-6 + f.write( + "7. **PBRS Invariance** - " + + ( + "Canonical (Σ shaping ≈ 0)" + if _canonical + else f"Non-canonical (Σ shaping = {_total_shaping:.6f})" + ) + + "\n" + ) f.write("\n") f.write("**Generated Files:**\n") f.write("- `reward_samples.csv` - Raw synthetic samples\n") @@ -2626,6 +3162,29 @@ def main() -> None: pnl_base_std=args.pnl_base_std, pnl_duration_vol_scale=args.pnl_duration_vol_scale, ) + # Post-simulation critical NaN validation (non-PBRS structural columns) + critical_cols = [ + "pnl", + "trade_duration", + "idle_duration", + "position", + "action", + "reward_total", + "reward_invalid", + "reward_idle", + "reward_hold", + "reward_exit", + ] + nan_issues = { + c: int(df[c].isna().sum()) + for c in critical_cols + if c in df.columns and df[c].isna().any() + } + if nan_issues: + raise AssertionError( + "NaN values detected in critical simulated columns: " + + ", ".join(f"{k}={v}" for k, v in nan_issues.items()) + ) # Attach simulation parameters for downstream manifest df.attrs["simulation_params"] = { "num_samples": args.num_samples, @@ -2640,6 +3199,8 @@ def main() -> None: "pnl_base_std": args.pnl_base_std, "pnl_duration_vol_scale": args.pnl_duration_vol_scale, } + # Attach resolved reward parameters for inline overrides rendering in report + df.attrs["reward_params"] = dict(params) args.output.mkdir(parents=True, exist_ok=True) csv_path = args.output / "reward_samples.csv" @@ -2666,6 +3227,9 @@ def main() -> None: stats_seed=args.stats_seed if getattr(args, "stats_seed", None) is not None else None, + strict_diagnostics=bool(getattr(args, "strict_diagnostics", False)), + bootstrap_resamples=getattr(args, "bootstrap_resamples", 10000), + skip_partial_dependence=bool(getattr(args, "skip_partial_dependence", False)), ) print( f"Complete statistical analysis saved to: {args.output / 'statistical_analysis.md'}" @@ -2673,53 +3237,35 @@ def main() -> None: # Generate manifest summarizing key metrics try: manifest_path = args.output / "manifest.json" - if (args.output / "feature_importance.csv").exists(): - fi_df = pd.read_csv(args.output / "feature_importance.csv") - top_features = fi_df.head(5)["feature"].tolist() - else: - top_features = [] - # Detect reward parameter overrides for traceability. - reward_param_overrides = {} - # Step 1: differences - for k in DEFAULT_MODEL_REWARD_PARAMETERS: - if k in params and params[k] != DEFAULT_MODEL_REWARD_PARAMETERS[k]: - reward_param_overrides[k] = params[k] - # Step 2: explicit flags - for k in DEFAULT_MODEL_REWARD_PARAMETERS: - if hasattr(args, k): - v = getattr(args, k) - if v is not None: - # Use the resolved param value for consistency - reward_param_overrides[k] = params.get(k, v) - + resolved_reward_params = dict(params) # already validated/normalized upstream manifest = { "generated_at": pd.Timestamp.now().isoformat(), "num_samples": int(len(df)), "seed": int(args.seed), "max_trade_duration": int(args.max_trade_duration), "profit_target_effective": float(profit_target * risk_reward_ratio), - "top_features": top_features, - "reward_param_overrides": reward_param_overrides, "pvalue_adjust_method": args.pvalue_adjust, "parameter_adjustments": adjustments, + "reward_params": resolved_reward_params, } sim_params = df.attrs.get("simulation_params", {}) if isinstance(sim_params, dict) and sim_params: import hashlib as _hashlib import json as _json + # Compose hash source from ALL simulation params and ALL resolved reward params for full reproducibility. _hash_source = { **{f"sim::{k}": sim_params[k] for k in sorted(sim_params)}, **{ - f"reward::{k}": reward_param_overrides[k] - for k in sorted(reward_param_overrides) + f"reward::{k}": resolved_reward_params[k] + for k in sorted(resolved_reward_params) }, } serialized = _json.dumps(_hash_source, sort_keys=True) manifest["params_hash"] = _hashlib.sha256( serialized.encode("utf-8") ).hexdigest() - manifest["params"] = sim_params + manifest["simulation_params"] = sim_params with manifest_path.open("w", encoding="utf-8") as mh: import json as _json @@ -2801,7 +3347,7 @@ def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float: if transform_name not in transforms: warnings.warn( f"Unknown potential transform '{transform_name}'; falling back to tanh", - category=UserWarning, + RewardDiagnosticsWarning, stacklevel=2, ) return _apply_transform_tanh(value, **kwargs) @@ -2813,28 +3359,31 @@ def apply_transform(transform_name: str, value: float, **kwargs: Any) -> float: def _get_potential_gamma(params: RewardParams) -> float: - """Return potential_gamma with fallback (missing/invalid -> 0.95 + warning).""" - value = params.get("potential_gamma", None) + """Return validated potential_gamma. - if value is None: + Process: + - If NaN -> default POTENTIAL_GAMMA_DEFAULT with warning (missing or unparsable). + - If outside [0,1] -> clamp + warning including original value. + - Guarantee returned float ∈ [0,1]. + """ + gamma = _get_float_param(params, "potential_gamma", np.nan) + if not np.isfinite(gamma): warnings.warn( - "potential_gamma not found in config, using default value of 0.95. " - "This parameter controls the discount factor for PBRS potential shaping.", - category=UserWarning, + f"potential_gamma not specified or invalid; defaulting to {POTENTIAL_GAMMA_DEFAULT}", + RewardDiagnosticsWarning, stacklevel=2, ) - return 0.95 - - if isinstance(value, (int, float)): - return float(value) - - warnings.warn( - f"Invalid potential_gamma value: {value}. Using default 0.95. " - "Expected numeric value in [0, 1].", - category=UserWarning, - stacklevel=2, - ) - return 0.95 + return POTENTIAL_GAMMA_DEFAULT + if gamma < 0.0 or gamma > 1.0: + original = gamma + gamma = float(np.clip(gamma, 0.0, 1.0)) + warnings.warn( + f"potential_gamma={original} outside [0,1]; clamped to {gamma}", + RewardDiagnosticsWarning, + stacklevel=2, + ) + return gamma + return float(gamma) def _get_str_param(params: RewardParams, key: str, default: str) -> str: @@ -2860,159 +3409,69 @@ def _get_bool_param(params: RewardParams, key: str, default: bool) -> bool: def _compute_hold_potential( pnl: float, duration_ratio: float, params: RewardParams ) -> float: - """ - Compute PBRS hold potential: Φ(s) = scale * 0.5 * [T_pnl(g*pnl_ratio) + T_dur(g*duration_ratio)]. - - This implements the canonical PBRS potential function from Ng et al. (1999): - R'(s,a,s') = R_base(s,a,s') + γΦ(s') - Φ(s) - - Args: - pnl: Current profit/loss ratio - duration_ratio: Current duration as fraction of max_trade_duration - params: Reward parameters containing PBRS configuration - - Returns: - Potential value Φ(s) - """ + """Compute PBRS hold potential Φ(s).""" if not _get_bool_param(params, "hold_potential_enabled", True): return _fail_safely("hold_potential_disabled") - - scale = _get_float_param(params, "hold_potential_scale", 1.0) - gain = _get_float_param(params, "hold_potential_gain", 1.0) - transform_pnl = _get_str_param(params, "hold_potential_transform_pnl", "tanh") - transform_duration = _get_str_param( - params, "hold_potential_transform_duration", "tanh" + return _compute_bi_component( + kind="hold_potential", + pnl=pnl, + duration_ratio=duration_ratio, + params=params, + scale_key="hold_potential_scale", + gain_key="hold_potential_gain", + transform_pnl_key="hold_potential_transform_pnl", + transform_dur_key="hold_potential_transform_duration", + non_finite_key="non_finite_hold_potential", ) - sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) - - # Apply transforms - if transform_pnl == "softsign_sharp": - t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) - else: - t_pnl = apply_transform(transform_pnl, gain * pnl) - - if transform_duration == "softsign_sharp": - t_dur = apply_transform( - transform_duration, gain * duration_ratio, sharpness=sharpness - ) - else: - t_dur = apply_transform(transform_duration, gain * duration_ratio) - - potential = scale * 0.5 * (t_pnl + t_dur) - - # Validate numerical safety - if not np.isfinite(potential): - return _fail_safely("non_finite_hold_potential") - - return float(potential) def _compute_entry_additive( pnl: float, duration_ratio: float, params: RewardParams ) -> float: - """ - Compute entry additive reward (non-PBRS component). - - Args: - pnl: Current profit/loss ratio - duration_ratio: Current duration as fraction of max_trade_duration - params: Reward parameters - - Returns: - Entry additive reward - """ if not _get_bool_param(params, "entry_additive_enabled", False): return _fail_safely("entry_additive_disabled") - - scale = _get_float_param(params, "entry_additive_scale", 1.0) - gain = _get_float_param(params, "entry_additive_gain", 1.0) - transform_pnl = _get_str_param(params, "entry_additive_transform_pnl", "tanh") - transform_duration = _get_str_param( - params, "entry_additive_transform_duration", "tanh" + return _compute_bi_component( + kind="entry_additive", + pnl=pnl, + duration_ratio=duration_ratio, + params=params, + scale_key="entry_additive_scale", + gain_key="entry_additive_gain", + transform_pnl_key="entry_additive_transform_pnl", + transform_dur_key="entry_additive_transform_duration", + non_finite_key="non_finite_entry_additive", ) - sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) - - # Apply transforms - if transform_pnl == "softsign_sharp": - t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) - else: - t_pnl = apply_transform(transform_pnl, gain * pnl) - - if transform_duration == "softsign_sharp": - t_dur = apply_transform( - transform_duration, gain * duration_ratio, sharpness=sharpness - ) - else: - t_dur = apply_transform(transform_duration, gain * duration_ratio) - - additive = scale * 0.5 * (t_pnl + t_dur) - - # Validate numerical safety - if not np.isfinite(additive): - return _fail_safely("non_finite_entry_additive") - - return float(additive) def _compute_exit_additive( pnl: float, duration_ratio: float, params: RewardParams ) -> float: - """ - Compute exit additive reward (non-PBRS component). - - Args: - pnl: Final profit/loss ratio at exit - duration_ratio: Final duration as fraction of max_trade_duration - params: Reward parameters - - Returns: - Exit additive reward - """ if not _get_bool_param(params, "exit_additive_enabled", False): return _fail_safely("exit_additive_disabled") - - scale = _get_float_param(params, "exit_additive_scale", 1.0) - gain = _get_float_param(params, "exit_additive_gain", 1.0) - transform_pnl = _get_str_param(params, "exit_additive_transform_pnl", "tanh") - transform_duration = _get_str_param( - params, "exit_additive_transform_duration", "tanh" + return _compute_bi_component( + kind="exit_additive", + pnl=pnl, + duration_ratio=duration_ratio, + params=params, + scale_key="exit_additive_scale", + gain_key="exit_additive_gain", + transform_pnl_key="exit_additive_transform_pnl", + transform_dur_key="exit_additive_transform_duration", + non_finite_key="non_finite_exit_additive", ) - sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) - # Apply transforms - if transform_pnl == "softsign_sharp": - t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) - else: - t_pnl = apply_transform(transform_pnl, gain * pnl) - if transform_duration == "softsign_sharp": - t_dur = apply_transform( - transform_duration, gain * duration_ratio, sharpness=sharpness - ) - else: - t_dur = apply_transform(transform_duration, gain * duration_ratio) - - additive = scale * 0.5 * (t_pnl + t_dur) - - # Validate numerical safety - if not np.isfinite(additive): - return _fail_safely("non_finite_exit_additive") - - return float(additive) - - -def _compute_exit_potential( - pnl: float, duration_ratio: float, params: RewardParams, last_potential: float = 0.0 -) -> float: +def _compute_exit_potential(last_potential: float, params: RewardParams) -> float: """Compute next potential Φ(s') for closing/exit transitions. - Mirrors the original environment semantics: + Semantics: - canonical: Φ' = 0.0 - progressive_release: Φ' = Φ * (1 - decay) with decay clamped to [0,1] - spike_cancel: Φ' = Φ / γ (neutralizes shaping spike ≈ 0 net effect) if γ>0 else Φ - retain_previous: Φ' = Φ - Invalid modes fall back to canonical. - Any non-finite resulting potential is coerced to 0.0. + + Invalid modes fall back to canonical. Any non-finite resulting potential is + coerced to 0.0. """ mode = _get_str_param(params, "exit_potential_mode", "canonical") if mode == "canonical": @@ -3020,9 +3479,26 @@ def _compute_exit_potential( if mode == "progressive_release": decay = _get_float_param(params, "exit_potential_decay", 0.5) - if not np.isfinite(decay) or decay < 0.0: + if not np.isfinite(decay): + warnings.warn( + "exit_potential_decay invalid (NaN/inf); defaulting to 0.5", + RewardDiagnosticsWarning, + stacklevel=2, + ) decay = 0.5 + if decay < 0.0: + warnings.warn( + f"exit_potential_decay={decay} < 0; clamped to 0.0", + RewardDiagnosticsWarning, + stacklevel=2, + ) + decay = 0.0 if decay > 1.0: + warnings.warn( + f"exit_potential_decay={decay} > 1; clamped to 1.0", + RewardDiagnosticsWarning, + stacklevel=2, + ) decay = 1.0 next_potential = last_potential * (1.0 - decay) elif mode == "spike_cancel": @@ -3090,10 +3566,8 @@ def apply_potential_shaping( # Enforce PBRS invariance (auto-disable additives in canonical mode) params = _enforce_pbrs_invariance(params) - # Validate gamma (with None handling matching original environment) + # Resolve gamma gamma = _get_potential_gamma(params) - if not (0.0 <= gamma <= 1.0): - raise ValueError(f"Invalid gamma: {gamma}. Must be in [0, 1]") # Compute current potential Φ(s) current_potential = _compute_hold_potential( @@ -3102,9 +3576,7 @@ def apply_potential_shaping( # Compute next potential Φ(s') if is_terminal: - next_potential = _compute_exit_potential( - next_pnl, next_duration_ratio, params, last_potential - ) + next_potential = _compute_exit_potential(last_potential, params) else: next_potential = _compute_hold_potential(next_pnl, next_duration_ratio, params) @@ -3121,9 +3593,6 @@ def apply_potential_shaping( else 0.0 ) - # Invariance diagnostic - _log_pbrs_invariance_warning(params) - # Total reward total_reward = base_reward + shaping_reward + entry_additive + exit_additive @@ -3136,66 +3605,68 @@ def apply_potential_shaping( def _enforce_pbrs_invariance(params: RewardParams) -> RewardParams: - """Enforce PBRS invariance by auto-disabling additives in canonical mode. - - Matches original environment behavior: canonical mode automatically - disables entry/exit additives to preserve theoretical invariance. - - PBRS invariance (Ng et al. 1999) requires: - - canonical exit_potential_mode (terminal Φ=0) - - No path-dependent additive reward components enabled. - - Returns modified params dict with invariance enforced. - """ + """Enforce PBRS invariance by auto-disabling additives in canonical mode.""" mode = _get_str_param(params, "exit_potential_mode", "canonical") - if mode == "canonical": - # Make a copy to avoid mutating input - enforced_params = dict(params) - entry_enabled = _get_bool_param(params, "entry_additive_enabled", False) - exit_enabled = _get_bool_param(params, "exit_additive_enabled", False) - - if entry_enabled: - warnings.warn( - "Disabling entry additive to preserve PBRS invariance (canonical mode).", - category=UserWarning, - stacklevel=2, - ) - enforced_params["entry_additive_enabled"] = False - - if exit_enabled: - warnings.warn( - "Disabling exit additive to preserve PBRS invariance (canonical mode).", - category=UserWarning, - stacklevel=2, - ) - enforced_params["exit_additive_enabled"] = False - - return enforced_params + if mode != "canonical": + return params + if params.get("_pbrs_invariance_applied"): + return params + + entry_enabled = _get_bool_param(params, "entry_additive_enabled", False) + exit_enabled = _get_bool_param(params, "exit_additive_enabled", False) + if entry_enabled: + warnings.warn( + "Disabling entry additive to preserve PBRS invariance (canonical mode).", + RewardDiagnosticsWarning, + stacklevel=2, + ) + params["entry_additive_enabled"] = False + if exit_enabled: + warnings.warn( + "Disabling exit additive to preserve PBRS invariance (canonical mode).", + RewardDiagnosticsWarning, + stacklevel=2, + ) + params["exit_additive_enabled"] = False + params["_pbrs_invariance_applied"] = True return params -def _log_pbrs_invariance_warning(params: RewardParams) -> None: - """Log an informational message if invariance conditions are violated. +def _compute_bi_component( + kind: str, + pnl: float, + duration_ratio: float, + params: RewardParams, + scale_key: str, + gain_key: str, + transform_pnl_key: str, + transform_dur_key: str, + non_finite_key: str, +) -> float: + """Generic helper for (pnl, duration) bi-component transforms. - PBRS invariance (Ng et al. 1999) requires: - - canonical exit_potential_mode (terminal Φ=0) - - No path-dependent additive reward components enabled. - This mirrors original environment diagnostic behavior. + Consolidates duplicated logic across hold potential & additives. """ - mode = _get_str_param(params, "exit_potential_mode", "canonical") - if mode == "canonical": - if _get_bool_param(params, "entry_additive_enabled", False) or _get_bool_param( - params, "exit_additive_enabled", False - ): - warnings.warn( - ( - "PBRS invariance relaxed: canonical mode with additive components enabled " - f"(entry_additive_enabled={_get_bool_param(params, 'entry_additive_enabled', False)}, " - f"exit_additive_enabled={_get_bool_param(params, 'exit_additive_enabled', False)})" - ), - category=UserWarning, - stacklevel=2, - ) + scale = _get_float_param(params, scale_key, 1.0) + gain = _get_float_param(params, gain_key, 1.0) + transform_pnl = _get_str_param(params, transform_pnl_key, "tanh") + transform_duration = _get_str_param(params, transform_dur_key, "tanh") + sharpness = _get_float_param(params, "potential_softsign_sharpness", 1.0) + + if transform_pnl == "softsign_sharp": + t_pnl = apply_transform(transform_pnl, gain * pnl, sharpness=sharpness) + else: + t_pnl = apply_transform(transform_pnl, gain * pnl) + if transform_duration == "softsign_sharp": + t_dur = apply_transform( + transform_duration, gain * duration_ratio, sharpness=sharpness + ) + else: + t_dur = apply_transform(transform_duration, gain * duration_ratio) + value = scale * 0.5 * (t_pnl + t_dur) + if not np.isfinite(value): + return _fail_safely(non_finite_key) + return float(value) if __name__ == "__main__": diff --git a/ReforceXY/reward_space_analysis/test_cli.py b/ReforceXY/reward_space_analysis/test_cli.py new file mode 100644 index 0000000..71e2f6d --- /dev/null +++ b/ReforceXY/reward_space_analysis/test_cli.py @@ -0,0 +1,400 @@ +"""CLI integration smoke test for reward_space_analysis. + +Purpose +------- +Execute a bounded, optionally shuffled subset of parameter combinations for +`reward_space_analysis.py` to verify end-to-end execution (smoke / regression +signal, not correctness proof). + +Key features +------------ +* Deterministic sampling with optional shuffling (`--shuffle-seed`). +* Optional duplication of first N scenarios under strict diagnostics + (`--strict-sample`). +* Per-scenario timing and aggregate statistics (mean / min / max seconds). +* Simple warning counting + (patch adds) breakdown of distinct warning lines. +* Scenario list + seed metadata exported for reproducibility. +* Direct CLI forwarding of bootstrap resample count to child process. + +Usage +----- +python test_cli.py --samples 50 --out-dir ../sample_run_output_smoke \ + --shuffle-seed 123 --strict-sample 3 --bootstrap-resamples 200 + +JSON Summary fields +------------------- +total, ok, failures[], warnings_total, warnings_breakdown, mean_seconds, +max_seconds, min_seconds, strict_duplicated, scenarios (list), seeds (metadata). + +Exit codes +---------- +0: success, 1: failures present, 130: interrupted (partial summary written). +""" + +from __future__ import annotations + +import argparse +import itertools +import json +import os +import platform +import random +import subprocess +import sys +import tempfile +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, TypedDict + +ConfigTuple = Tuple[str, str, float, int, int, int] + + +SUMMARY_FILENAME = "reward_space_cli_smoke_results.json" + + +class ScenarioResult(TypedDict): + config: ConfigTuple + status: str + stdout: str + stderr: str + strict: bool + seconds: Optional[float] + warnings: int + + +class SummaryResult(TypedDict): + total: int + ok: int + failures: List[ScenarioResult] + warnings_total: int + mean_seconds: Optional[float] + max_seconds: Optional[float] + min_seconds: Optional[float] + strict_duplicated: int + + +def build_arg_matrix( + max_scenarios: int = 40, + shuffle_seed: Optional[int] = None, +) -> List[ConfigTuple]: + exit_potential_modes = [ + "canonical", + "progressive_release", + "retain_previous", + "spike_cancel", + ] + exit_attenuation_modes = ["linear", "sqrt", "power", "half_life", "legacy"] + potential_gammas = [0.0, 0.5, 0.95, 0.999] + hold_enabled = [0, 1] + entry_additive_enabled = [0, 1] + exit_additive_enabled = [0, 1] + + product_iter = itertools.product( + exit_potential_modes, + exit_attenuation_modes, + potential_gammas, + hold_enabled, + entry_additive_enabled, + exit_additive_enabled, + ) + + full: List[ConfigTuple] = list(product_iter) + if shuffle_seed is not None: + rnd = random.Random(shuffle_seed) + rnd.shuffle(full) + if max_scenarios >= len(full): + return full + step = len(full) / max_scenarios + idx_pos = 0.0 + selected: List[ConfigTuple] = [] + for _ in range(max_scenarios): + idx = int(idx_pos) + if idx >= len(full): + idx = len(full) - 1 + selected.append(full[idx]) + idx_pos += step + return selected + + +def run_scenario( + script: Path, + out_dir: Path, + idx: int, + total: int, + base_samples: int, + conf: ConfigTuple, + strict: bool, + bootstrap_resamples: int, + timeout: int, +) -> ScenarioResult: + ( + exit_potential_mode, + exit_attenuation_mode, + potential_gamma, + hold_enabled, + entry_additive_enabled, + exit_additive_enabled, + ) = conf + scenario_dir = out_dir / f"scenario_{idx:02d}" + scenario_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + sys.executable, + str(script), + "--num_samples", + str(base_samples), + "--output", + str(scenario_dir), + "--exit_potential_mode", + exit_potential_mode, + "--exit_attenuation_mode", + exit_attenuation_mode, + "--potential_gamma", + str(potential_gamma), + "--hold_potential_enabled", + str(hold_enabled), + "--entry_additive_enabled", + str(entry_additive_enabled), + "--exit_additive_enabled", + str(exit_additive_enabled), + "--seed", + str(100 + idx), + ] + # Forward bootstrap resamples explicitly + cmd += ["--bootstrap_resamples", str(bootstrap_resamples)] + if strict: + cmd.append("--strict_diagnostics") + start = time.perf_counter() + try: + proc = subprocess.run( + cmd, capture_output=True, text=True, check=False, timeout=timeout + ) + except subprocess.TimeoutExpired: + return { + "config": conf, + "status": "timeout", + "stderr": "", + "stdout": "", + "strict": strict, + "seconds": None, + "warnings": 0, + } + status = "ok" if proc.returncode == 0 else f"error({proc.returncode})" + end = time.perf_counter() + combined = (proc.stdout + "\n" + proc.stderr).lower() + warn_count = combined.count("warning") + return { + "config": conf, + "status": status, + "stdout": proc.stdout[-5000:], + "stderr": proc.stderr[-5000:], + "strict": strict, + "seconds": round(end - start, 4), + "warnings": warn_count, + } + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--samples", type=int, default=40, help="num synthetic samples per scenario" + ) + parser.add_argument( + "--out-dir", + type=str, + default="sample_run_output_smoke", + help="output parent directory", + ) + parser.add_argument( + "--shuffle-seed", + type=int, + default=None, + help="If set, shuffle full scenario space before sampling a diverse subset", + ) + parser.add_argument( + "--strict-sample", + type=int, + default=0, + help="Duplicate the first N scenarios executed again with --strict_diagnostics", + ) + parser.add_argument( + "--max-scenarios", + type=int, + default=40, + help="Maximum number of (non-strict) scenarios before strict duplication", + ) + parser.add_argument( + "--bootstrap-resamples", + type=int, + default=120, + help="Number of bootstrap resamples to pass to child processes (speed/perf tradeoff)", + ) + parser.add_argument( + "--per-scenario-timeout", + type=int, + default=600, + help="Timeout (seconds) per child process (default: 600)", + ) + parser.add_argument( + "--store-full-logs", + action="store_true", + help="If set, store full stdout/stderr (may be large) instead of tail truncation.", + ) + args = parser.parse_args() + + # Basic validation + if args.max_scenarios <= 0: + parser.error("--max-scenarios must be > 0") + if args.samples <= 0: + parser.error("--samples must be > 0") + if args.strict_sample < 0: + parser.error("--strict-sample must be >= 0") + if args.bootstrap_resamples <= 0: + parser.error("--bootstrap-resamples must be > 0") + + script = Path(__file__).parent / "reward_space_analysis.py" + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + scenarios = build_arg_matrix( + max_scenarios=args.max_scenarios, shuffle_seed=args.shuffle_seed + ) + + # Prepare list of (conf, strict_flag) + scenario_pairs: List[Tuple[ConfigTuple, bool]] = [(c, False) for c in scenarios] + strict_n = max(0, min(args.strict_sample, len(scenarios))) + for c in scenarios[:strict_n]: + scenario_pairs.append((c, True)) + + results: List[ScenarioResult] = [] + total = len(scenario_pairs) + interrupted = False + try: + for i, (conf, strict_flag) in enumerate(scenario_pairs, start=1): + # Ensure child process sees the chosen bootstrap resamples via direct CLI args only + res = run_scenario( + script, + out_dir, + i, + total, + args.samples, + conf, + strict=strict_flag, + bootstrap_resamples=args.bootstrap_resamples, + timeout=args.per_scenario_timeout, + ) + results.append(res) + status = res["status"] + tag = "[strict]" if strict_flag else "" + secs = res.get("seconds") + secs_str = f" {secs:.2f}s" if secs is not None else "" + print(f"[{i}/{total}] {conf} {tag} -> {status}{secs_str}") + except KeyboardInterrupt: + interrupted = True + print("\nKeyboardInterrupt received: writing partial summary...") + + ok = sum(1 for r in results if r["status"] == "ok") + failures = [r for r in results if r["status"] != "ok"] + total_warnings = sum(r["warnings"] for r in results) + durations: List[float] = [ + float(r["seconds"]) for r in results if isinstance(r["seconds"], float) + ] + summary: SummaryResult = { + "total": len(results), + "ok": ok, + "failures": failures, + "warnings_total": total_warnings, + "mean_seconds": round(sum(durations) / len(durations), 4) + if durations + else None, + "max_seconds": max(durations) if durations else None, + "min_seconds": min(durations) if durations else None, + "strict_duplicated": strict_n, + } + # Build warning breakdown (simple line fingerprinting) + warning_counts: Dict[str, int] = {} + for r in results: + text = (r["stderr"] + "\n" + r["stdout"]).splitlines() + for line in text: + if "warning" in line.lower(): + # Fingerprint: trim + collapse whitespace + limit length + fp = " ".join(line.strip().split())[:160] + warning_counts[fp] = warning_counts.get(fp, 0) + 1 + + # Scenario export (list of configs only, excluding strict flag duplication detail) + scenario_list = [list(c) for c, _ in scenario_pairs] + + # Collect environment + reproducibility metadata + def _git_hash() -> Optional[str]: + try: + proc = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, + text=True, + check=False, + timeout=2, + ) + if proc.returncode == 0: + return proc.stdout.strip() or None + except Exception: + return None + return None + + summary_extra: Dict[str, Any] = { + "warnings_breakdown": warning_counts, + "scenarios": scenario_list, + "seeds": { + "shuffle_seed": args.shuffle_seed, + "strict_sample": args.strict_sample, + "max_scenarios": args.max_scenarios, + "bootstrap_resamples": args.bootstrap_resamples, + }, + "metadata": { + "timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "python_version": sys.version.split()[0], + "platform": platform.platform(), + "git_commit": _git_hash(), + "schema_version": 1, + "per_scenario_timeout": args.per_scenario_timeout, + }, + } + serializable: Dict[str, Any] + if interrupted: + serializable = {**summary, **summary_extra, "interrupted": True} + else: + serializable = {**summary, **summary_extra} + # Atomic write to avoid corrupt partial files + tmp_fd, tmp_path = tempfile.mkstemp(prefix="_tmp_summary_", dir=str(out_dir)) + try: + with os.fdopen(tmp_fd, "w", encoding="utf-8") as fh: + json.dump(serializable, fh, indent=2) + os.replace(tmp_path, out_dir / SUMMARY_FILENAME) + except Exception: + # Best effort fallback + try: + Path(out_dir / SUMMARY_FILENAME).write_text( + json.dumps(serializable, indent=2), encoding="utf-8" + ) + finally: + if os.path.exists(tmp_path): + try: + os.remove(tmp_path) + except OSError: + pass + else: + if os.path.exists(tmp_path): # Should have been moved; defensive cleanup + try: + os.remove(tmp_path) + except OSError: + pass + print("Summary written to", out_dir / SUMMARY_FILENAME) + if not interrupted and summary["failures"]: + print("Failures detected:") + for f in summary["failures"]: + print(f" - {f['config']}: {f['status']}") + sys.exit(1) + if interrupted: + sys.exit(130) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index e5f2843..fadece4 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -11,6 +11,7 @@ import dataclasses import json import math import pickle +import re import shutil import subprocess import sys @@ -18,7 +19,7 @@ import tempfile import unittest import warnings from pathlib import Path -from typing import Iterable, Sequence +from typing import Iterable, Optional, Sequence, Union import numpy as np import pandas as pd @@ -36,7 +37,10 @@ PBRS_REQUIRED_PARAMS = PBRS_INTEGRATION_PARAMS + ["exit_potential_mode"] # Import functions to test try: from reward_space_analysis import ( + ATTENUATION_MODES, + ATTENUATION_MODES_WITH_LEGACY, DEFAULT_MODEL_REWARD_PARAMETERS, + PBRS_INVARIANCE_TOL, Actions, Positions, RewardContext, @@ -68,6 +72,44 @@ except ImportError as e: sys.exit(1) +# --- Helper factories (DRY for contexts and params) --- +def base_params(**overrides) -> dict: + """Return a fresh copy of DEFAULT_MODEL_REWARD_PARAMETERS with overrides applied. + + Ensures tests do not mutate the global defaults inadvertently. + """ + params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params.update(overrides) + return params + + +def make_ctx( + *, + pnl: float = 0.0, + trade_duration: int = 0, + idle_duration: int = 0, + max_trade_duration: int = 100, + max_unrealized_profit: float = 0.0, + min_unrealized_profit: float = 0.0, + position: Positions = Positions.Neutral, + action: Actions = Actions.Neutral, +) -> RewardContext: + """Factory for RewardContext with neutral defaults. + + Only fields explicitly varied in a test need to be specified; keeps test bodies concise. + """ + return RewardContext( + pnl=pnl, + trade_duration=trade_duration, + idle_duration=idle_duration, + max_trade_duration=max_trade_duration, + max_unrealized_profit=max_unrealized_profit, + min_unrealized_profit=min_unrealized_profit, + position=position, + action=action, + ) + + class RewardSpaceTestBase(unittest.TestCase): """Base class with common test utilities. @@ -102,10 +144,10 @@ class RewardSpaceTestBase(unittest.TestCase): def assertAlmostEqualFloat( self, - first: float | int, - second: float | int, + first: Union[float, int], + second: Union[float, int], tolerance: float = 1e-6, - msg: str | None = None, + msg: Union[str, None] = None, ) -> None: """Absolute tolerance compare with explicit failure and finite check.""" self.assertFinite(first, name="a") @@ -118,7 +160,7 @@ class RewardSpaceTestBase(unittest.TestCase): ) # --- Statistical bounds helpers (factorize redundancy) --- - def assertPValue(self, value: float | int, msg: str = "") -> None: + def assertPValue(self, value: Union[float, int], msg: str = "") -> None: """Assert a p-value is finite and within [0,1].""" self.assertFinite(value, name="p-value") self.assertGreaterEqual(value, 0.0, msg or f"p-value < 0: {value}") @@ -126,10 +168,10 @@ class RewardSpaceTestBase(unittest.TestCase): def assertDistanceMetric( self, - value: float | int, + value: Union[float, int], *, non_negative: bool = True, - upper: float | None = None, + upper: Optional[float] = None, name: str = "metric", ) -> None: """Generic distance/divergence bounds: finite, optional non-negativity and optional upper bound.""" @@ -141,7 +183,7 @@ class RewardSpaceTestBase(unittest.TestCase): def assertEffectSize( self, - value: float | int, + value: Union[float, int], *, lower: float = -1.0, upper: float = 1.0, @@ -152,17 +194,17 @@ class RewardSpaceTestBase(unittest.TestCase): self.assertGreaterEqual(value, lower, f"{name} < {lower}: {value}") self.assertLessEqual(value, upper, f"{name} > {upper}: {value}") - def assertFinite(self, value: float | int, name: str = "value") -> None: + def assertFinite(self, value: Union[float, int], name: str = "value") -> None: """Assert scalar is finite.""" if not np.isfinite(value): # low-level base check to avoid recursion self.fail(f"{name} not finite: {value}") def assertMonotonic( self, - seq: Sequence[float | int] | Iterable[float | int], + seq: Union[Sequence[Union[float, int]], Iterable[Union[float, int]]], *, - non_increasing: bool | None = None, - non_decreasing: bool | None = None, + non_increasing: Optional[bool] = None, + non_decreasing: Optional[bool] = None, tolerance: float = 0.0, name: str = "sequence", ) -> None: @@ -188,9 +230,9 @@ class RewardSpaceTestBase(unittest.TestCase): def assertWithin( self, - value: float | int, - low: float | int, - high: float | int, + value: Union[float, int], + low: Union[float, int], + high: Union[float, int], *, name: str = "value", inclusive: bool = True, @@ -285,8 +327,24 @@ class TestIntegration(RewardSpaceTestBase): with open(self.output_path / run_dir / "manifest.json", "r") as f: manifest = json.load(f) - required_keys = {"generated_at", "num_samples", "seed", "params_hash"} - self.assertTrue(required_keys.issubset(manifest.keys())) + required_keys = { + "generated_at", + "num_samples", + "seed", + "params_hash", + "reward_params", + "simulation_params", + } + self.assertTrue( + required_keys.issubset(manifest.keys()), + f"Missing keys: {required_keys - set(manifest.keys())}", + ) + self.assertIsInstance(manifest["reward_params"], dict) + self.assertIsInstance(manifest["simulation_params"], dict) + # Legacy fields must be absent + self.assertNotIn("top_features", manifest) + self.assertNotIn("reward_param_overrides", manifest) + self.assertNotIn("params", manifest) self.assertEqual(manifest["num_samples"], self.TEST_SAMPLES) self.assertEqual(manifest["seed"], self.SEED) @@ -447,10 +505,9 @@ class TestRewardAlignment(RewardSpaceTestBase): def test_basic_reward_calculation(self): """Test basic reward calculation consistency.""" - context = RewardContext( + context = make_ctx( pnl=self.TEST_PROFIT_TARGET, trade_duration=10, - idle_duration=0, max_trade_duration=100, max_unrealized_profit=0.025, min_unrealized_profit=0.015, @@ -484,10 +541,9 @@ class TestRewardAlignment(RewardSpaceTestBase): """ # Build context where pnl == 0.0 and max_unrealized_profit == pnl - ctx = RewardContext( + ctx = make_ctx( pnl=0.0, trade_duration=1, - idle_duration=0, max_trade_duration=100, max_unrealized_profit=0.0, min_unrealized_profit=-0.02, @@ -498,7 +554,7 @@ class TestRewardAlignment(RewardSpaceTestBase): params = self.DEFAULT_PARAMS.copy() profit_target = self.TEST_PROFIT_TARGET * self.TEST_RR - pnl_factor = _get_pnl_factor(params, ctx, profit_target) + pnl_factor = _get_pnl_factor(params, ctx, profit_target, self.TEST_RR) # Expect no efficiency modulation: factor should be >= 0 and close to 1.0 self.assertFinite(pnl_factor, name="pnl_factor") self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=1e-6) @@ -513,7 +569,7 @@ class TestRewardAlignment(RewardSpaceTestBase): base_factor = self.TEST_BASE_FACTOR idle_duration = 40 # below large threshold, near small threshold - context = RewardContext( + context = make_ctx( pnl=0.0, trade_duration=0, idle_duration=idle_duration, @@ -557,7 +613,7 @@ class TestRewardAlignment(RewardSpaceTestBase): params = self.DEFAULT_PARAMS.copy() params.update( { - "potential_gamma": 0.95, + "potential_gamma": DEFAULT_MODEL_REWARD_PARAMETERS["potential_gamma"], "exit_potential_mode": "progressive_release", "exit_potential_decay": 5.0, # should clamp to 1.0 "hold_potential_enabled": True, @@ -598,7 +654,11 @@ class TestRewardAlignment(RewardSpaceTestBase): current_dur = 0.4 prev_potential = _compute_hold_potential(current_pnl, current_dur, params) # Use helper accessor to avoid union type issues - gamma = _get_float_param(params, "potential_gamma", 0.95) + gamma = _get_float_param( + params, + "potential_gamma", + DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95), + ) expected_next = ( prev_potential / gamma if gamma not in (0.0, None) else prev_potential ) @@ -632,7 +692,7 @@ class TestRewardAlignment(RewardSpaceTestBase): risk_reward_ratio = 1.0 # Two contexts with different idle durations - ctx_a = RewardContext( + ctx_a = make_ctx( pnl=0.0, trade_duration=0, idle_duration=20, @@ -888,7 +948,7 @@ class TestRewardAlignment(RewardSpaceTestBase): def test_idle_penalty_zero_when_profit_target_zero(self): """If profit_target=0 → idle_factor=0 → idle penalty must be exactly 0 for neutral idle state.""" - context = RewardContext( + context = make_ctx( pnl=0.0, trade_duration=0, idle_duration=30, @@ -1238,7 +1298,7 @@ class TestPublicAPI(RewardSpaceTestBase): def test_bootstrap_confidence_intervals(self): """Test bootstrap confidence intervals calculation.""" # Create test data - np.random.seed(42) + np.random.seed(self.SEED) test_data = pd.DataFrame( { "reward_total": np.random.normal(0, 1, 100), @@ -1372,7 +1432,7 @@ class TestStatisticalValidation(RewardSpaceTestBase): def test_distribution_metrics_mathematical_bounds(self): """Test mathematical bounds and validity of distribution shift metrics.""" # Create two different distributions for testing - np.random.seed(42) + np.random.seed(self.SEED) df1 = pd.DataFrame( { "pnl": np.random.normal(0, self.TEST_PNL_STD, 500), @@ -1555,7 +1615,7 @@ class TestStatisticalValidation(RewardSpaceTestBase): def test_statistical_functions_bounds_validation(self): """Test that all statistical functions respect mathematical bounds.""" # Create test data with known statistical properties - np.random.seed(42) + np.random.seed(self.SEED) df = pd.DataFrame( { "reward_total": np.random.normal(0, 1, 300), @@ -1823,7 +1883,8 @@ class TestBoundaryConditions(RewardSpaceTestBase): def test_different_exit_attenuation_modes(self): """Test different exit attenuation modes (legacy, sqrt, linear, power, half_life).""" - modes = ["legacy", "sqrt", "linear", "power", "half_life"] + # Use canonical constant (includes legacy) instead of hardcoded literals + modes = ATTENUATION_MODES_WITH_LEGACY for mode in modes: with self.subTest(mode=mode): @@ -1918,9 +1979,8 @@ class TestHelperFunctions(RewardSpaceTestBase): def test_statistical_functions(self): """Test statistical functions.""" - # Create test data with specific patterns - np.random.seed(42) + np.random.seed(self.SEED) test_data = pd.DataFrame( { "reward_total": np.random.normal(0, 1, 200), @@ -2422,8 +2482,8 @@ class TestRewardRobustness(RewardSpaceTestBase): Modes covered: sqrt, linear, power, half_life, plateau+linear (after grace). Legacy is excluded (non-monotonic by design). Plateau+linear includes flat grace then monotonic. """ - - modes = ["sqrt", "linear", "power", "half_life", "plateau_linear"] + # Start from canonical modes (excluding legacy already) and add synthetic plateau_linear variant + modes = list(ATTENUATION_MODES) + ["plateau_linear"] base_factor = self.TEST_BASE_FACTOR pnl = 0.05 pnl_factor = 1.0 @@ -2622,13 +2682,14 @@ class TestRewardRobustness(RewardSpaceTestBase): params = self.DEFAULT_PARAMS.copy() # Try multiple modes / extreme params - modes = ["linear", "power", "half_life", "sqrt", "legacy", "linear_plateau"] + # All canonical modes + legacy + synthetic plateau variant + modes = list(ATTENUATION_MODES_WITH_LEGACY) + ["plateau_linear"] base_factor = self.TEST_BASE_FACTOR pnl = 0.05 pnl_factor = 2.0 # amplified for mode in modes: params_mode = params.copy() - if mode == "linear_plateau": + if mode == "plateau_linear": params_mode["exit_attenuation_mode"] = "linear" params_mode["exit_plateau"] = True params_mode["exit_plateau_grace"] = 0.4 @@ -3134,7 +3195,7 @@ class TestPBRSIntegration(RewardSpaceTestBase): def test_exit_potential_canonical(self): """Test exit potential in canonical mode.""" params = {"exit_potential_mode": "canonical"} - val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + val = _compute_exit_potential(1.0, params) self.assertEqual(val, 0.0) def test_exit_potential_progressive_release(self): @@ -3144,84 +3205,80 @@ class TestPBRSIntegration(RewardSpaceTestBase): "exit_potential_decay": 0.8, } # Expected: Φ' = Φ * (1 - decay) = 1 * (1 - 0.8) = 0.2 - val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + val = _compute_exit_potential(1.0, params) self.assertAlmostEqual(val, 0.2) def test_exit_potential_spike_cancel(self): """Spike cancel: Φ' = Φ / γ (inversion).""" - params = {"exit_potential_mode": "spike_cancel", "potential_gamma": 0.95} - val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + params = { + "exit_potential_mode": "spike_cancel", + "potential_gamma": DEFAULT_MODEL_REWARD_PARAMETERS["potential_gamma"], + } + val = _compute_exit_potential(1.0, params) self.assertAlmostEqual(val, 1.0 / 0.95, places=7) def test_exit_potential_retain_previous(self): """Test exit potential in retain previous mode.""" params = {"exit_potential_mode": "retain_previous"} - val = _compute_exit_potential(0.5, 0.3, params, last_potential=1.0) + val = _compute_exit_potential(1.0, params) self.assertEqual(val, 1.0) def test_pbrs_terminal_canonical(self): - """Test PBRS behavior in canonical mode with terminal state.""" - params = { - "potential_gamma": 0.95, - "hold_potential_enabled": True, - "hold_potential_scale": 1.0, - "hold_potential_gain": 1.0, - "hold_potential_transform_pnl": "tanh", - "hold_potential_transform_duration": "tanh", - "exit_potential_mode": "canonical", - "entry_additive_enabled": False, - "exit_additive_enabled": False, - } + """Canonical mode structural invariance checks. - current_pnl = 0.5 - current_duration_ratio = 0.3 - expected_current_potential = _compute_hold_potential( - current_pnl, current_duration_ratio, params - ) - - total_reward, shaping_reward, next_potential = apply_potential_shaping( - base_reward=100.0, - current_pnl=current_pnl, - current_duration_ratio=current_duration_ratio, - next_pnl=0.0, - next_duration_ratio=0.0, - is_terminal=True, - last_potential=0.0, - params=params, + We do NOT enforce Σ shaping ≈ 0 here (finite-horizon drift acceptable). We verify: + - Additive components auto-disabled (policy invariance enforcement) + - Terminal potential always zero (Φ_terminal = 0) + - Shaping magnitudes bounded (safety guard against spikes) + """ + params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params.update( + { + "exit_potential_mode": "canonical", + "hold_potential_enabled": True, + "entry_additive_enabled": False, + "exit_additive_enabled": False, + } ) - - # Terminal potential should be 0 in canonical mode - self.assertEqual(next_potential, 0.0) - # Shaping reward should be negative (releasing potential) - self.assertTrue(shaping_reward < 0) - # Check exact formula: γΦ(s') - Φ(s) = 0.95 * 0 - expected_current_potential - expected_shaping = 0.95 * 0.0 - expected_current_potential - self.assertAlmostEqual(shaping_reward, expected_shaping, delta=1e-10) - - def test_pbrs_invalid_gamma(self): - """Test PBRS with invalid gamma value.""" - params = {"potential_gamma": 1.5, "hold_potential_enabled": True} - with self.assertRaises(ValueError): - apply_potential_shaping( + rng = np.random.default_rng(123) + last_potential = 0.0 + terminal_next_potentials: list[float] = [] + shaping_values: list[float] = [] + current_pnl = 0.0 + current_dur = 0.0 + for _ in range(350): + is_terminal = rng.uniform() < 0.08 + # Sample candidate next state (ignored for terminal potential because canonical sets Φ'=0) + next_pnl = 0.0 if is_terminal else float(rng.normal(0, 0.2)) + inc = rng.uniform(0, 0.12) + next_dur = 0.0 if is_terminal else float(min(1.0, current_dur + inc)) + _total, shaping, next_potential = apply_potential_shaping( base_reward=0.0, - current_pnl=0.0, - current_duration_ratio=0.0, - next_pnl=0.0, - next_duration_ratio=0.0, - is_terminal=True, - last_potential=0.0, + current_pnl=current_pnl, + current_duration_ratio=current_dur, + next_pnl=next_pnl, + next_duration_ratio=next_dur, + is_terminal=is_terminal, + last_potential=last_potential, params=params, ) - - def test_calculate_reward_with_pbrs_integration(self): - """Test that PBRS parameters are properly integrated in defaults.""" - # Test that PBRS parameters are in the default parameters - for param in PBRS_INTEGRATION_PARAMS: - self.assertIn( - param, - DEFAULT_MODEL_REWARD_PARAMETERS, - f"PBRS parameter {param} missing from defaults", - ) + shaping_values.append(shaping) + if is_terminal: + terminal_next_potentials.append(next_potential) + last_potential = 0.0 + current_pnl = 0.0 + current_dur = 0.0 + else: + last_potential = next_potential + current_pnl = next_pnl + current_dur = next_dur + # Assertions + self.assertFalse(params["entry_additive_enabled"]) # enforced + self.assertFalse(params["exit_additive_enabled"]) # enforced + if terminal_next_potentials: + self.assertTrue(all(abs(p) < 1e-12 for p in terminal_next_potentials)) + max_abs = max(abs(v) for v in shaping_values) if shaping_values else 0.0 + self.assertLessEqual(max_abs, 5.0) # Test basic PBRS function integration works params = {"hold_potential_enabled": True, "hold_potential_scale": 1.0} @@ -3237,6 +3294,155 @@ class TestPBRSIntegration(RewardSpaceTestBase): f"Missing PBRS parameter: {param}", ) + # --- End of structural PBRS tests (legacy sum test removed) --- + + def test_pbrs_progressive_release_decay_clamped_zero_current(self): + """progressive_release with decay>1 clamps to 1 (full release to 0). + + Scenario isolates current Φ(s)=0 and non-zero previous potential carried in last_potential, + verifying clamp logic independent of current state potential. + """ + params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params.update( + { + "exit_potential_mode": "progressive_release", + "exit_potential_decay": 5.0, # will clamp to 1.0 + } + ) + _total, shaping, next_potential = apply_potential_shaping( + base_reward=0.0, + current_pnl=0.0, + current_duration_ratio=0.0, + next_pnl=0.0, + next_duration_ratio=0.0, + is_terminal=True, + last_potential=0.8, + params=params, + ) + # After clamp: next_potential = 0 and shaping = γ*0 - 0 (Φ(s)=0) => 0 + self.assertAlmostEqualFloat(next_potential, 0.0, tolerance=1e-12) + self.assertAlmostEqualFloat(shaping, 0.0, tolerance=1e-9) + + def test_pbrs_invalid_transform_fallback_potential(self): + """Invalid transform name in potential path must fall back without NaNs.""" + params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params.update({"hold_potential_transform_pnl": "not_a_transform"}) + _total, shaping, next_potential = apply_potential_shaping( + base_reward=0.0, + current_pnl=0.2, + current_duration_ratio=0.3, + next_pnl=0.25, + next_duration_ratio=0.35, + is_terminal=False, + last_potential=0.0, + params=params, + ) + self.assertTrue(np.isfinite(shaping)) + self.assertTrue(np.isfinite(next_potential)) + + def test_pbrs_gamma_zero_disables_progressive_effect(self): + """Gamma=0 => shaping term = -Φ(s); potential still updated for next step (Φ' used only if γ>0). + + We only assert shaping bounded since Φ is transformed & scaled; ensures stability without exceptions. + """ + params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params.update({"potential_gamma": 0.0, "hold_potential_enabled": True}) + base_reward = 1.23 + _total, shaping, next_potential = apply_potential_shaping( + base_reward=base_reward, + current_pnl=0.1, + current_duration_ratio=0.2, + next_pnl=0.15, + next_duration_ratio=0.25, + is_terminal=False, + last_potential=0.0, + params=params, + ) + # Shaping bounded (Φ in [-scale, scale] after transforms), conservatively check |shaping| <= 1 + self.assertLessEqual(abs(shaping), 1.0) + self.assertTrue(np.isfinite(next_potential)) + + +class TestReportFormatting(RewardSpaceTestBase): + """Tests for report formatting elements not previously covered.""" + + def test_abs_shaping_line_present_and_constant(self): + """Ensure the report contains the Abs Σ Shaping Reward line and tolerance not hard-coded elsewhere. + + Strategy: run a minimal simulation via generate_report setUp artifact. + We reuse helper simulate_samples (already tested) to produce output then search in report. + """ + # Generate a tiny dataset directly using existing pathway by invoking analysis main logic would be heavy. + # Instead we mimic the invariance block logic locally to ensure formatting; this keeps test fast and stable. + # Create a temporary markdown file to exercise the function that writes invariance section. + # Sanity check tolerance value + self.assertAlmostEqual(PBRS_INVARIANCE_TOL, 1e-6, places=12) + + # Use small synthetic DataFrame with zero shaping sum (pandas imported globally) + df = pd.DataFrame( + { + "reward_shaping": [1e-12, -1e-12], + "reward_entry_additive": [0.0, 0.0], + "reward_exit_additive": [0.0, 0.0], + } + ) + total_shaping = df["reward_shaping"].sum() + self.assertTrue(abs(total_shaping) < PBRS_INVARIANCE_TOL) + # Reconstruct lines exactly as writer does + lines = [ + f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |", + ] + content = "\n".join(lines) + # Validate formatting pattern using regex + m = re.search( + r"\| Abs Σ Shaping Reward \| ([0-9]+\.[0-9]{6}e[+-][0-9]{2}) \|", content + ) + self.assertIsNotNone(m, "Abs Σ Shaping Reward line missing or misformatted") + # Ensure scientific notation magnitude consistent with small number + val = float(m.group(1)) if m else None # type: ignore[arg-type] + if val is not None: + self.assertLess(val, 1e-8 + 1e-12) + # Ensure no stray hard-coded tolerance string inside content + self.assertNotIn( + "1e-6", content, "Tolerance should not be hard-coded in line output" + ) + + def test_pbrs_non_canonical_report_generation(self): + """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification.""" + import re # local lightweight + + df = pd.DataFrame( + { + "reward_shaping": [0.01, -0.002], # somme = 0.008 (>> tolérance) + "reward_entry_additive": [0.0, 0.0], + "reward_exit_additive": [0.001, 0.0], + } + ) + total_shaping = df["reward_shaping"].sum() + self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL) + invariance_status = "❌ Non-canonical" + section = [] + section.append("**PBRS Invariance Summary:**\n") + section.append("| Field | Value |\n") + section.append("|-------|-------|\n") + section.append(f"| Invariance | {invariance_status} |\n") + section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n") + section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n") + section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n") + section.append( + f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n" + ) + section.append( + f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n" + ) + content = "".join(section) + self.assertIn("❌ Non-canonical", content) + self.assertRegex(content, r"Σ Shaping Reward \| 0\.008000 \|") + m_abs = re.search(r"Abs Σ Shaping Reward \| ([0-9.]+e[+-][0-9]{2}) \|", content) + self.assertIsNotNone(m_abs) + if m_abs: + self.assertAlmostEqual(abs(total_shaping), float(m_abs.group(1)), places=12) + if __name__ == "__main__": # Configure test discovery and execution diff --git a/ReforceXY/user_data/config-template.json b/ReforceXY/user_data/config-template.json index f98c23a..575819c 100644 --- a/ReforceXY/user_data/config-template.json +++ b/ReforceXY/user_data/config-template.json @@ -158,7 +158,7 @@ "model_type": "MaskablePPO", "policy_type": "MlpPolicy", "model_reward_parameters": { - "rr": 1, + "rr": 2, "profit_aim": 0.025, "win_reward_factor": 2 }, diff --git a/ReforceXY/user_data/freqaimodels/ReforceXY.py b/ReforceXY/user_data/freqaimodels/ReforceXY.py index 93524eb..685fea3 100644 --- a/ReforceXY/user_data/freqaimodels/ReforceXY.py +++ b/ReforceXY/user_data/freqaimodels/ReforceXY.py @@ -1386,6 +1386,15 @@ class MyRLEnv(Base5ActionRLEnv): self._potential_gamma = 0.95 else: self._potential_gamma = float(potential_gamma) + # Validate potential_gamma range (0 <= gamma <= 1) + if not (0.0 <= self._potential_gamma <= 1.0): + original_gamma = self._potential_gamma + self._potential_gamma = min(1.0, max(0.0, self._potential_gamma)) + logger.warning( + "potential_gamma=%s is outside [0,1]; clamped to %s", + original_gamma, + self._potential_gamma, + ) self._potential_softsign_sharpness: float = float( model_reward_parameters.get("potential_softsign_sharpness", 1.0) ) @@ -1479,6 +1488,13 @@ class MyRLEnv(Base5ActionRLEnv): self._entry_additive_enabled = False self._exit_additive_enabled = False + if MyRLEnv.is_unsupported_pbrs_config( + self._hold_potential_enabled, getattr(self, "add_state_info", False) + ): + logger.warning( + "PBRS: hold_potential_enabled=True & add_state_info=False is unsupported. PBRS invariance is not guaranteed" + ) + def _get_next_position(self, action: int) -> Positions: if action == Actions.Long_enter.value and self._position == Positions.Neutral: return Positions.Long @@ -1767,6 +1783,18 @@ class MyRLEnv(Base5ActionRLEnv): self._entry_additive_enabled or self._exit_additive_enabled ) + @staticmethod + def is_unsupported_pbrs_config( + hold_potential_enabled: bool, add_state_info: bool + ) -> bool: + """Return True if PBRS potential relies on hidden (non-observed) state. + + Case: hold_potential enabled while auxiliary state info (pnl, trade_duration) is excluded + from the observation space (add_state_info=False). In that situation, Φ(s) uses hidden + variables and PBRS becomes informative, voiding the strict policy invariance guarantee. + """ + return hold_potential_enabled and not add_state_info + def _apply_potential_shaping( self, base_reward: float, @@ -2175,15 +2203,23 @@ class MyRLEnv(Base5ActionRLEnv): model_reward_parameters = self.rl_config.get("model_reward_parameters", {}) pnl_target_factor = 1.0 - if pnl_target > 0.0 and pnl > pnl_target: - win_reward_factor = float( - model_reward_parameters.get("win_reward_factor", 2.0) - ) + if pnl_target > 0.0: pnl_factor_beta = float(model_reward_parameters.get("pnl_factor_beta", 0.5)) pnl_ratio = pnl / pnl_target - pnl_target_factor = 1.0 + win_reward_factor * math.tanh( - pnl_factor_beta * (pnl_ratio - 1.0) - ) + if abs(pnl_ratio) > 1.0: + base_pnl_target_factor = math.tanh( + pnl_factor_beta * (abs(pnl_ratio) - 1.0) + ) + win_reward_factor = float( + model_reward_parameters.get("win_reward_factor", 2.0) + ) + if pnl_ratio > 1.0: + pnl_target_factor = 1.0 + win_reward_factor * base_pnl_target_factor + elif pnl_ratio < -(1.0 / self.rr): + loss_penalty_factor = win_reward_factor * self.rr + pnl_target_factor = ( + 1.0 + loss_penalty_factor * base_pnl_target_factor + ) efficiency_factor = 1.0 efficiency_weight = float(model_reward_parameters.get("efficiency_weight", 1.0))