]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
test(reforcexy): tests cleanups
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 22 Oct 2025 21:37:32 +0000 (23:37 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 22 Oct 2025 21:37:32 +0000 (23:37 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/reward_space_analysis/pyproject.toml
ReforceXY/reward_space_analysis/tests/__init__.py
ReforceXY/reward_space_analysis/tests/test_api_helpers.py
ReforceXY/reward_space_analysis/tests/test_pbrs.py
ReforceXY/reward_space_analysis/tests/test_reward_components.py
ReforceXY/reward_space_analysis/tests/test_robustness.py
ReforceXY/reward_space_analysis/tests/test_statistics.py
ReforceXY/reward_space_analysis/tests/test_utilities.py
ReforceXY/reward_space_analysis/uv.lock

index 831be3ee0f57cd4c0baf70331101aa2d24fb00dc..54a7839023d26997256655fa82b1072f37decae7 100644 (file)
@@ -20,6 +20,7 @@ dev = [
     "pytest>=6.0",
     "ruff",
     "coverage",
+    "pytest-cov>=7.0.0",
 ]
 
 [build-system]
index 64cf7d7978d5d2b963268343eaf0499a0058176e..f8d632cd8de358d83ffe587a9460fd393cbf6326 100644 (file)
@@ -7,7 +7,6 @@ from .test_reward_components import TestRewardComponents
 from .test_robustness import TestRewardRobustnessAndBoundaries
 from .test_statistics import TestStatistics
 from .test_utilities import (
-    TestBootstrapStatistics,
     TestCsvAndSimulationOptions,
     TestLoadRealEpisodes,
     TestParamsPropagation,
@@ -23,7 +22,6 @@ __all__ = [
     "TestPrivateFunctions",
     "TestRewardRobustnessAndBoundaries",
     "TestLoadRealEpisodes",
-    "TestBootstrapStatistics",
     "TestReportFormatting",
     "TestCsvAndSimulationOptions",
     "TestParamsPropagation",
index fa4bac24c7e193d9e742059d2f6f09d380029ba7..5ebb0471bff9bcb6b0937ed7413e31339e00a7b1 100644 (file)
@@ -30,6 +30,7 @@ class TestAPIAndHelpers(RewardSpaceTestBase):
     """Public API + helper utility tests."""
 
     def test_parse_overrides(self):
+        """Test parse overrides."""
         overrides = ["alpha=1.5", "mode=linear", "limit=42"]
         result = parse_overrides(overrides)
         self.assertEqual(result["alpha"], 1.5)
@@ -39,6 +40,7 @@ class TestAPIAndHelpers(RewardSpaceTestBase):
             parse_overrides(["badpair"])
 
     def test_api_simulation_and_reward_smoke(self):
+        """Test api simulation and reward smoke."""
         df = simulate_samples(
             params=self.base_params(max_trade_duration_candles=40),
             num_samples=20,
@@ -250,68 +252,6 @@ class TestAPIAndHelpers(RewardSpaceTestBase):
 class TestPrivateFunctions(RewardSpaceTestBase):
     """Test private functions through public API calls."""
 
-    def test_idle_penalty_via_rewards(self):
-        """Test idle penalty calculation via reward calculation."""
-        context = self.make_ctx(
-            pnl=0.0,
-            trade_duration=0,
-            idle_duration=20,
-            max_unrealized_profit=0.0,
-            min_unrealized_profit=0.0,
-            position=Positions.Neutral,
-            action=Actions.Neutral,
-        )
-        breakdown = calculate_reward(
-            context,
-            self.DEFAULT_PARAMS,
-            base_factor=self.TEST_BASE_FACTOR,
-            profit_target=self.TEST_PROFIT_TARGET,
-            risk_reward_ratio=1.0,
-            short_allowed=True,
-            action_masking=True,
-        )
-        self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative")
-        self.assertAlmostEqualFloat(
-            breakdown.total,
-            breakdown.idle_penalty
-            + breakdown.reward_shaping
-            + breakdown.entry_additive
-            + breakdown.exit_additive,
-            tolerance=self.TOL_IDENTITY_RELAXED,
-            msg="Total should equal sum of components (idle + shaping/additives)",
-        )
-
-    def test_hold_penalty_via_rewards(self):
-        """Test hold penalty calculation via reward calculation."""
-        context = self.make_ctx(
-            pnl=0.01,
-            trade_duration=150,
-            idle_duration=0,
-            max_unrealized_profit=0.02,
-            min_unrealized_profit=0.0,
-            position=Positions.Long,
-            action=Actions.Neutral,
-        )
-        breakdown = calculate_reward(
-            context,
-            self.DEFAULT_PARAMS,
-            base_factor=self.TEST_BASE_FACTOR,
-            profit_target=self.TEST_PROFIT_TARGET,
-            risk_reward_ratio=self.TEST_RR,
-            short_allowed=True,
-            action_masking=True,
-        )
-        self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative")
-        self.assertAlmostEqualFloat(
-            breakdown.total,
-            breakdown.hold_penalty
-            + breakdown.reward_shaping
-            + breakdown.entry_additive
-            + breakdown.exit_additive,
-            tolerance=self.TOL_IDENTITY_RELAXED,
-            msg="Total should equal sum of components (hold + shaping/additives)",
-        )
-
     def test_exit_reward_calculation(self):
         """Test exit reward calculation with various scenarios."""
         scenarios = [
@@ -380,97 +320,6 @@ class TestPrivateFunctions(RewardSpaceTestBase):
             msg="Total should equal invalid penalty plus shaping/additives",
         )
 
-    def test_hold_penalty_zero_before_max_duration(self):
-        """Test hold penalty logic: zero penalty before max_trade_duration."""
-        max_duration = 128
-        test_cases = [
-            (64, "before max_duration"),
-            (127, "just before max_duration"),
-            (128, "exactly at max_duration"),
-            (129, "just after max_duration"),
-            (192, "well after max_duration"),
-        ]
-        for trade_duration, description in test_cases:
-            with self.subTest(duration=trade_duration, desc=description):
-                context = self.make_ctx(
-                    pnl=0.0,
-                    trade_duration=trade_duration,
-                    idle_duration=0,
-                    max_unrealized_profit=0.0,
-                    min_unrealized_profit=0.0,
-                    position=Positions.Long,
-                    action=Actions.Neutral,
-                )
-                breakdown = calculate_reward(
-                    context,
-                    self.DEFAULT_PARAMS,
-                    base_factor=self.TEST_BASE_FACTOR,
-                    profit_target=self.TEST_PROFIT_TARGET,
-                    risk_reward_ratio=1.0,
-                    short_allowed=True,
-                    action_masking=True,
-                )
-                duration_ratio = trade_duration / max_duration
-                if duration_ratio < 1.0:
-                    self.assertEqual(
-                        breakdown.hold_penalty,
-                        0.0,
-                        f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
-                    )
-                elif duration_ratio == 1.0:
-                    self.assertEqual(
-                        breakdown.hold_penalty,
-                        0.0,
-                        f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
-                    )
-                else:
-                    self.assertLess(
-                        breakdown.hold_penalty,
-                        0.0,
-                        f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})",
-                    )
-                self.assertAlmostEqualFloat(
-                    breakdown.total,
-                    breakdown.hold_penalty
-                    + breakdown.reward_shaping
-                    + breakdown.entry_additive
-                    + breakdown.exit_additive,
-                    tolerance=self.TOL_IDENTITY_RELAXED,
-                    msg=f"Total mismatch including shaping {description}",
-                )
-
-    def test_hold_penalty_progressive_scaling(self):
-        """Test that hold penalty scales progressively after max_duration."""
-        params = self.base_params(max_trade_duration_candles=100)
-        durations = [150, 200, 300]
-        penalties: list[float] = []
-        for duration in durations:
-            context = self.make_ctx(
-                pnl=0.0,
-                trade_duration=duration,
-                idle_duration=0,
-                max_unrealized_profit=0.0,
-                min_unrealized_profit=0.0,
-                position=Positions.Long,
-                action=Actions.Neutral,
-            )
-            breakdown = calculate_reward(
-                context,
-                params,
-                base_factor=self.TEST_BASE_FACTOR,
-                profit_target=self.TEST_PROFIT_TARGET,
-                risk_reward_ratio=self.TEST_RR,
-                short_allowed=True,
-                action_masking=True,
-            )
-            penalties.append(breakdown.hold_penalty)
-        for i in range(1, len(penalties)):
-            self.assertLessEqual(
-                penalties[i],
-                penalties[i - 1],
-                f"Penalty should increase with duration: {penalties[i]} > {penalties[i - 1]}",
-            )
-
     def test_new_invariant_and_warn_parameters(self):
         """Ensure new tunables (check_invariants, exit_factor_threshold) exist and behave.
 
index fe4e80a7fa38c7705315f8235aad306f48b3d324..73b9d4f824e4e58081c2dab88a88e898558ad2cf 100644 (file)
@@ -8,6 +8,7 @@ import numpy as np
 
 from reward_space_analysis import (
     DEFAULT_MODEL_REWARD_PARAMETERS,
+    PBRS_INVARIANCE_TOL,
     _compute_entry_additive,
     _compute_exit_additive,
     _compute_exit_potential,
@@ -15,6 +16,7 @@ from reward_space_analysis import (
     _get_float_param,
     apply_potential_shaping,
     apply_transform,
+    simulate_samples,
     validate_reward_parameters,
 )
 
@@ -109,6 +111,65 @@ class TestPBRS(RewardSpaceTestBase):
         self.assertTrue(abs(apply_transform("softsign", 100.0)) < 1.0)
         self.assertTrue(abs(apply_transform("softsign", -100.0)) < 1.0)
 
+    def test_canonical_invariance_flag_and_sum(self):
+        """Canonical mode + no additives -> pbrs_invariant True and Σ shaping ≈ 0."""
+        params = self.base_params(
+            exit_potential_mode="canonical",
+            entry_additive_enabled=False,
+            exit_additive_enabled=False,
+            hold_potential_enabled=True,
+        )
+        df = simulate_samples(
+            params={**params, "max_trade_duration_candles": 100},
+            num_samples=400,
+            seed=self.SEED,
+            base_factor=self.TEST_BASE_FACTOR,
+            profit_target=self.TEST_PROFIT_TARGET,
+            risk_reward_ratio=self.TEST_RR,
+            max_duration_ratio=2.0,
+            trading_mode="margin",
+            pnl_base_std=self.TEST_PNL_STD,
+            pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
+        )
+        unique_flags = set(df["pbrs_invariant"].unique().tolist())
+        self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}")
+        total_shaping = float(df["reward_shaping"].sum())
+        self.assertLess(
+            abs(total_shaping),
+            PBRS_INVARIANCE_TOL,
+            f"Canonical invariance violated: Σ shaping = {total_shaping}",
+        )
+
+    def test_non_canonical_flag_false_and_sum_nonzero(self):
+        """Non-canonical exit potential (progressive_release) -> pbrs_invariant False and Σ shaping != 0."""
+        params = self.base_params(
+            exit_potential_mode="progressive_release",
+            exit_potential_decay=0.25,
+            entry_additive_enabled=False,
+            exit_additive_enabled=False,
+            hold_potential_enabled=True,
+        )
+        df = simulate_samples(
+            params={**params, "max_trade_duration_candles": 100},
+            num_samples=400,
+            seed=self.SEED,
+            base_factor=self.TEST_BASE_FACTOR,
+            profit_target=self.TEST_PROFIT_TARGET,
+            risk_reward_ratio=self.TEST_RR,
+            max_duration_ratio=2.0,
+            trading_mode="margin",
+            pnl_base_std=self.TEST_PNL_STD,
+            pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
+        )
+        unique_flags = set(df["pbrs_invariant"].unique().tolist())
+        self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}")
+        total_shaping = float(df["reward_shaping"].sum())
+        self.assertGreater(
+            abs(total_shaping),
+            PBRS_INVARIANCE_TOL * 10,
+            f"Expected non-zero Σ shaping in non-canonical mode (got {total_shaping})",
+        )
+
     def test_asinh_transform(self):
         """asinh transform: x / sqrt(1 + x^2) in (-1, 1)."""
         self.assertAlmostEqualFloat(apply_transform("asinh", 0.0), 0.0)
@@ -154,31 +215,20 @@ class TestPBRS(RewardSpaceTestBase):
             tolerance=self.TOL_IDENTITY_RELAXED,
         )
 
-    def test_hold_potential_basic(self):
-        """Test basic hold potential calculation."""
-        params = {
-            "hold_potential_enabled": True,
-            "hold_potential_scale": 1.0,
-            "hold_potential_gain": 1.0,
-            "hold_potential_transform_pnl": "tanh",
-            "hold_potential_transform_duration": "tanh",
-        }
-        val = _compute_hold_potential(0.5, 0.3, params)
-        self.assertFinite(val, name="hold_potential")
-
-    def test_entry_additive_disabled(self):
-        """Test entry additive when disabled."""
-        params = {"entry_additive_enabled": False}
-        val = _compute_entry_additive(0.5, 0.3, params)
-        self.assertEqual(val, 0.0)
-
-    def test_exit_additive_disabled(self):
-        """Test exit additive when disabled."""
-        params = {"exit_additive_enabled": False}
-        val = _compute_exit_additive(0.5, 0.3, params)
-        self.assertEqual(val, 0.0)
+    def test_additive_components_disabled_return_zero(self):
+        """Test entry and exit additives return zero when disabled."""
+        # Test entry additive disabled
+        params_entry = {"entry_additive_enabled": False}
+        val_entry = _compute_entry_additive(0.5, 0.3, params_entry)
+        self.assertEqual(val_entry, 0.0)
+
+        # Test exit additive disabled
+        params_exit = {"exit_additive_enabled": False}
+        val_exit = _compute_exit_additive(0.5, 0.3, params_exit)
+        self.assertEqual(val_exit, 0.0)
 
     def test_exit_potential_canonical(self):
+        """Test exit potential canonical."""
         params = self.base_params(
             exit_potential_mode="canonical",
             hold_potential_enabled=True,
@@ -447,6 +497,154 @@ class TestPBRS(RewardSpaceTestBase):
         self.assertAlmostEqualFloat(s_base, s_scaled, tolerance=self.TOL_DISTRIB_SHAPE)
         self.assertAlmostEqualFloat(k_base, k_scaled, tolerance=self.TOL_DISTRIB_SHAPE)
 
+    def test_pbrs_non_canonical_report_generation(self):
+        """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification."""
+        import re
+
+        import pandas as pd
+
+        from reward_space_analysis import PBRS_INVARIANCE_TOL
+
+        df = pd.DataFrame(
+            {
+                "reward_shaping": [0.01, -0.002],
+                "reward_entry_additive": [0.0, 0.0],
+                "reward_exit_additive": [0.001, 0.0],
+            }
+        )
+        total_shaping = df["reward_shaping"].sum()
+        self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL)
+        invariance_status = "❌ Non-canonical"
+        section = []
+        section.append("**PBRS Invariance Summary:**\n")
+        section.append("| Field | Value |\n")
+        section.append("|-------|-------|\n")
+        section.append(f"| Invariance | {invariance_status} |\n")
+        section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n")
+        section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n")
+        section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n")
+        section.append(f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n")
+        section.append(f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n")
+        content = "".join(section)
+        self.assertIn("❌ Non-canonical", content)
+        self.assertRegex(content, "Σ Shaping Reward \\| 0\\.008000 \\|")
+        m_abs = re.search("Abs Σ Shaping Reward \\| ([0-9.]+e[+-][0-9]{2}) \\|", content)
+        self.assertIsNotNone(m_abs)
+        if m_abs:
+            val = float(m_abs.group(1))
+            self.assertAlmostEqual(abs(total_shaping), val, places=12)
+
+    def test_potential_gamma_boundary_values_stability(self):
+        """Test potential gamma boundary values (0 and ≈1) produce bounded shaping."""
+        for gamma in [0.0, 0.999999]:
+            params = self.base_params(
+                hold_potential_enabled=True,
+                entry_additive_enabled=False,
+                exit_additive_enabled=False,
+                exit_potential_mode="canonical",
+                potential_gamma=gamma,
+            )
+            _tot, shap, next_pot = apply_potential_shaping(
+                base_reward=0.0,
+                current_pnl=0.02,
+                current_duration_ratio=0.3,
+                next_pnl=0.025,
+                next_duration_ratio=0.35,
+                is_exit=False,
+                last_potential=0.0,
+                params=params,
+            )
+            self.assertTrue(np.isfinite(shap))
+            self.assertTrue(np.isfinite(next_pot))
+            self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING)
+
+    def test_report_cumulative_invariance_aggregation(self):
+        """Canonical telescoping term: small per-step mean drift, bounded increments."""
+        params = self.base_params(
+            hold_potential_enabled=True,
+            entry_additive_enabled=False,
+            exit_additive_enabled=False,
+            exit_potential_mode="canonical",
+        )
+        gamma = _get_float_param(
+            params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
+        )
+        rng = np.random.default_rng(321)
+        last_potential = 0.0
+        telescoping_sum = 0.0
+        max_abs_step = 0.0
+        steps = 0
+        for _ in range(500):
+            is_exit = rng.uniform() < 0.1
+            current_pnl = float(rng.normal(0, 0.05))
+            current_dur = float(rng.uniform(0, 1))
+            next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.05))
+            next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
+            _tot, _shap, next_potential = apply_potential_shaping(
+                base_reward=0.0,
+                current_pnl=current_pnl,
+                current_duration_ratio=current_dur,
+                next_pnl=next_pnl,
+                next_duration_ratio=next_dur,
+                is_exit=is_exit,
+                last_potential=last_potential,
+                params=params,
+            )
+            inc = gamma * next_potential - last_potential
+            telescoping_sum += inc
+            if abs(inc) > max_abs_step:
+                max_abs_step = abs(inc)
+            steps += 1
+            if is_exit:
+                last_potential = 0.0
+            else:
+                last_potential = next_potential
+        mean_drift = telescoping_sum / max(1, steps)
+        self.assertLess(
+            abs(mean_drift),
+            0.02,
+            f"Per-step telescoping drift too large (mean={mean_drift}, steps={steps})",
+        )
+        self.assertLessEqual(
+            max_abs_step,
+            self.PBRS_MAX_ABS_SHAPING,
+            f"Unexpected large telescoping increment (max={max_abs_step})",
+        )
+
+    def test_report_explicit_non_invariance_progressive_release(self):
+        """progressive_release should generally yield non-zero cumulative shaping (release leak)."""
+        params = self.base_params(
+            hold_potential_enabled=True,
+            entry_additive_enabled=False,
+            exit_additive_enabled=False,
+            exit_potential_mode="progressive_release",
+            exit_potential_decay=0.25,
+        )
+        rng = np.random.default_rng(321)
+        last_potential = 0.0
+        shaping_sum = 0.0
+        for _ in range(160):
+            is_exit = rng.uniform() < 0.15
+            next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.07))
+            next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
+            _tot, shap, next_pot = apply_potential_shaping(
+                base_reward=0.0,
+                current_pnl=float(rng.normal(0, 0.07)),
+                current_duration_ratio=float(rng.uniform(0, 1)),
+                next_pnl=next_pnl,
+                next_duration_ratio=next_dur,
+                is_exit=is_exit,
+                last_potential=last_potential,
+                params=params,
+            )
+            shaping_sum += shap
+            last_potential = 0.0 if is_exit else next_pot
+        self.assertGreater(
+            abs(shaping_sum),
+            PBRS_INVARIANCE_TOL * 50,
+            f"Expected non-zero Σ shaping (got {shaping_sum})",
+        )
+
 
 if __name__ == "__main__":
     unittest.main()
index 074b8aad4f4702ba518a0b28287739c9686f00c0..a9c3864b398e9823fb7bbefaa61b1fe6f83e37a7 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 """Tests for reward calculation components and algorithms."""
 
+import dataclasses
 import math
 import unittest
 
@@ -8,7 +9,9 @@ from reward_space_analysis import (
     Actions,
     Positions,
     RewardContext,
+    _compute_hold_potential,
     _get_exit_factor,
+    _get_float_param,
     _get_pnl_factor,
     calculate_reward,
 )
@@ -17,10 +20,162 @@ from .test_base import RewardSpaceTestBase
 
 
 class TestRewardComponents(RewardSpaceTestBase):
+    def test_hold_potential_computation_finite(self):
+        """Test hold potential computation returns finite values."""
+        params = {
+            "hold_potential_enabled": True,
+            "hold_potential_scale": 1.0,
+            "hold_potential_gain": 1.0,
+            "hold_potential_transform_pnl": "tanh",
+            "hold_potential_transform_duration": "tanh",
+        }
+        val = _compute_hold_potential(0.5, 0.3, params)
+        self.assertFinite(val, name="hold_potential")
+
+    def test_hold_penalty_comprehensive(self):
+        """Comprehensive hold penalty test: calculation, thresholds, and progressive scaling."""
+        # Test 1: Basic hold penalty calculation via reward calculation (trade_duration > max_duration)
+        context = self.make_ctx(
+            pnl=0.01,
+            trade_duration=150,  # > default max_duration (128)
+            idle_duration=0,
+            max_unrealized_profit=0.02,
+            min_unrealized_profit=0.0,
+            position=Positions.Long,
+            action=Actions.Neutral,
+        )
+        breakdown = calculate_reward(
+            context,
+            self.DEFAULT_PARAMS,
+            base_factor=self.TEST_BASE_FACTOR,
+            profit_target=self.TEST_PROFIT_TARGET,
+            risk_reward_ratio=self.TEST_RR,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative")
+        self.assertAlmostEqualFloat(
+            breakdown.total,
+            breakdown.hold_penalty
+            + breakdown.reward_shaping
+            + breakdown.entry_additive
+            + breakdown.exit_additive,
+            tolerance=self.TOL_IDENTITY_RELAXED,
+            msg="Total should equal sum of components (hold + shaping/additives)",
+        )
+
+        # Test 2: Zero penalty before max_duration threshold
+        max_duration = 128
+        test_cases = [
+            (64, "before max_duration"),
+            (127, "just before max_duration"),
+            (128, "exactly at max_duration"),
+            (129, "just after max_duration"),
+        ]
+        for trade_duration, description in test_cases:
+            with self.subTest(duration=trade_duration, desc=description):
+                context = self.make_ctx(
+                    pnl=0.0,
+                    trade_duration=trade_duration,
+                    idle_duration=0,
+                    position=Positions.Long,
+                    action=Actions.Neutral,
+                )
+                breakdown = calculate_reward(
+                    context,
+                    self.DEFAULT_PARAMS,
+                    base_factor=self.TEST_BASE_FACTOR,
+                    profit_target=self.TEST_PROFIT_TARGET,
+                    risk_reward_ratio=1.0,
+                    short_allowed=True,
+                    action_masking=True,
+                )
+                duration_ratio = trade_duration / max_duration
+                if duration_ratio < 1.0:
+                    self.assertEqual(
+                        breakdown.hold_penalty,
+                        0.0,
+                        f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
+                    )
+                elif duration_ratio == 1.0:
+                    # At exact max duration, penalty can be 0.0 or slightly negative (implementation dependent)
+                    self.assertLessEqual(
+                        breakdown.hold_penalty,
+                        0.0,
+                        f"Hold penalty should be <= 0.0 {description} (ratio={duration_ratio:.2f})",
+                    )
+                else:
+                    # Beyond max duration, penalty should be strictly negative
+                    self.assertLess(
+                        breakdown.hold_penalty,
+                        0.0,
+                        f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})",
+                    )
+
+        # Test 3: Progressive scaling after max_duration
+        params = self.base_params(max_trade_duration_candles=100)
+        durations = [150, 200, 300]
+        penalties: list[float] = []
+        for duration in durations:
+            context = self.make_ctx(
+                pnl=0.0,
+                trade_duration=duration,
+                idle_duration=0,
+                position=Positions.Long,
+                action=Actions.Neutral,
+            )
+            breakdown = calculate_reward(
+                context,
+                params,
+                base_factor=self.TEST_BASE_FACTOR,
+                profit_target=self.TEST_PROFIT_TARGET,
+                risk_reward_ratio=self.TEST_RR,
+                short_allowed=True,
+                action_masking=True,
+            )
+            penalties.append(breakdown.hold_penalty)
+        for i in range(1, len(penalties)):
+            self.assertLessEqual(
+                penalties[i],
+                penalties[i - 1],
+                f"Penalty should increase (more negative) with duration: {penalties[i]} <= {penalties[i - 1]}",
+            )
+
+    def test_idle_penalty_via_rewards(self):
+        """Test idle penalty calculation via reward calculation."""
+        context = self.make_ctx(
+            pnl=0.0,
+            trade_duration=0,
+            idle_duration=20,
+            max_unrealized_profit=0.0,
+            min_unrealized_profit=0.0,
+            position=Positions.Neutral,
+            action=Actions.Neutral,
+        )
+        breakdown = calculate_reward(
+            context,
+            self.DEFAULT_PARAMS,
+            base_factor=self.TEST_BASE_FACTOR,
+            profit_target=self.TEST_PROFIT_TARGET,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative")
+        self.assertAlmostEqualFloat(
+            breakdown.total,
+            breakdown.idle_penalty
+            + breakdown.reward_shaping
+            + breakdown.entry_additive
+            + breakdown.exit_additive,
+            tolerance=self.TOL_IDENTITY_RELAXED,
+            msg="Total should equal sum of components (idle + shaping/additives)",
+        )
+
     """Core reward component tests."""
 
-    def test_reward_calculation_scenarios_basic(self):
-        """Reward calculation scenarios: expected components become non-zero."""
+    def test_reward_calculation_component_activation(self):
+        """Test reward component activation: idle_penalty and exit_component trigger correctly."""
         test_cases = [
             (Positions.Neutral, Actions.Neutral, "idle_penalty"),
             (Positions.Long, Actions.Long_exit, "exit_component"),
@@ -53,6 +208,7 @@ class TestRewardComponents(RewardSpaceTestBase):
                 self.assertFinite(breakdown.total, name="breakdown.total")
 
     def test_efficiency_zero_policy(self):
+        """Test efficiency zero policy."""
         ctx = self.make_ctx(
             pnl=0.0,
             trade_duration=1,
@@ -68,6 +224,7 @@ class TestRewardComponents(RewardSpaceTestBase):
         self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=self.TOL_GENERIC_EQ)
 
     def test_max_idle_duration_candles_logic(self):
+        """Test max idle duration candles logic."""
         params_small = self.base_params(max_idle_duration_candles=50)
         params_large = self.base_params(max_idle_duration_candles=200)
         base_factor = self.TEST_BASE_FACTOR
@@ -385,6 +542,64 @@ class TestRewardComponents(RewardSpaceTestBase):
                 f"Long/Short asymmetry pnl={pnl}: long={br_long.exit_component}, short={br_short.exit_component}",
             )
 
+    def test_idle_penalty_fallback_and_proportionality(self):
+        """Idle penalty fallback denominator & proportional scaling."""
+        params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100)
+        base_factor = 90.0
+        profit_target = self.TEST_PROFIT_TARGET
+        risk_reward_ratio = 1.0
+        ctx_a = self.make_ctx(
+            pnl=0.0,
+            trade_duration=0,
+            idle_duration=20,
+            position=Positions.Neutral,
+            action=Actions.Neutral,
+        )
+        ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
+        br_a = calculate_reward(
+            ctx_a,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=risk_reward_ratio,
+            short_allowed=True,
+            action_masking=True,
+        )
+        br_b = calculate_reward(
+            ctx_b,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=risk_reward_ratio,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertLess(br_a.idle_penalty, 0.0)
+        self.assertLess(br_b.idle_penalty, 0.0)
+        ratio = br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None
+        self.assertIsNotNone(ratio)
+        if ratio is not None:
+            self.assertAlmostEqualFloat(abs(ratio), 2.0, tolerance=0.2)
+        ctx_mid = dataclasses.replace(ctx_a, idle_duration=120)
+        br_mid = calculate_reward(
+            ctx_mid,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=risk_reward_ratio,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertLess(br_mid.idle_penalty, 0.0)
+        idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5)
+        idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025)
+        factor = _get_float_param(params, "base_factor", float(base_factor))
+        idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0
+        observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
+        if observed_ratio > 0:
+            implied_D = 120 / observed_ratio ** (1 / idle_penalty_power)
+            self.assertAlmostEqualFloat(implied_D, 400.0, tolerance=20.0)
+
 
 if __name__ == "__main__":
     unittest.main()
index d402f3d063e4125c1000f1505639b131a3a47b61..9ed3988a851bd524738d49addfa8dbda517e1899 100644 (file)
@@ -1,7 +1,6 @@
 #!/usr/bin/env python3
 """Robustness tests and boundary condition validation."""
 
-import dataclasses
 import math
 import unittest
 import warnings
@@ -15,7 +14,6 @@ from reward_space_analysis import (
     Positions,
     RewardContext,
     _get_exit_factor,
-    _get_float_param,
     _get_pnl_factor,
     calculate_reward,
     simulate_samples,
@@ -161,8 +159,9 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         invalid_combinations = df[(df["pnl"].abs() <= self.EPS_BASE) & (df["reward_exit"] != 0)]
         self.assertEqual(len(invalid_combinations), 0)
 
-    def test_exit_factor_mathematical_formulas(self):
-        """Mathematical correctness of exit factor calculations across modes."""
+    def test_exit_factor_comprehensive(self):
+        """Comprehensive exit factor test: mathematical correctness and monotonic attenuation."""
+        # Part 1: Mathematical formulas validation
         context = self.make_ctx(
             pnl=0.05,
             trade_duration=50,
@@ -174,6 +173,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         )
         params = self.DEFAULT_PARAMS.copy()
         duration_ratio = 50 / 100
+
+        # Test power mode
         params["exit_attenuation_mode"] = "power"
         params["exit_power_tau"] = 0.5
         params["exit_plateau"] = False
@@ -187,6 +188,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
             action_masking=True,
         )
         self.assertGreater(reward_power.exit_component, 0)
+
+        # Test half_life mode with mathematical validation
         params["exit_attenuation_mode"] = "half_life"
         params["exit_half_life"] = 0.5
         reward_half_life = calculate_reward(
@@ -212,6 +215,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
             tolerance=self.TOL_IDENTITY_RELAXED,
             msg="Half-life attenuation mismatch: observed vs expected",
         )
+
+        # Test linear mode
         params["exit_attenuation_mode"] = "linear"
         params["exit_linear_slope"] = 1.0
         reward_linear = calculate_reward(
@@ -232,62 +237,57 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         unique_rewards = set((f"{r:.6f}" for r in rewards))
         self.assertGreater(len(unique_rewards), 1)
 
-    def test_idle_penalty_fallback_and_proportionality(self):
-        """Idle penalty fallback denominator & proportional scaling (robustness)."""
-        params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100)
-        base_factor = 90.0
-        profit_target = self.TEST_PROFIT_TARGET
-        risk_reward_ratio = 1.0
-        ctx_a = self.make_ctx(
-            pnl=0.0,
-            trade_duration=0,
-            idle_duration=20,
-            position=Positions.Neutral,
-            action=Actions.Neutral,
-        )
-        ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
-        br_a = calculate_reward(
-            ctx_a,
-            params,
-            base_factor=base_factor,
-            profit_target=profit_target,
-            risk_reward_ratio=risk_reward_ratio,
-            short_allowed=True,
-            action_masking=True,
-        )
-        br_b = calculate_reward(
-            ctx_b,
-            params,
-            base_factor=base_factor,
-            profit_target=profit_target,
-            risk_reward_ratio=risk_reward_ratio,
-            short_allowed=True,
-            action_masking=True,
-        )
-        self.assertLess(br_a.idle_penalty, 0.0)
-        self.assertLess(br_b.idle_penalty, 0.0)
-        ratio = br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None
-        self.assertIsNotNone(ratio)
-        self.assertAlmostEqualFloat(abs(ratio), 2.0, tolerance=0.2)
-        ctx_mid = dataclasses.replace(ctx_a, idle_duration=120)
-        br_mid = calculate_reward(
-            ctx_mid,
-            params,
-            base_factor=base_factor,
-            profit_target=profit_target,
-            risk_reward_ratio=risk_reward_ratio,
-            short_allowed=True,
-            action_masking=True,
-        )
-        self.assertLess(br_mid.idle_penalty, 0.0)
-        idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5)
-        idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025)
-        factor = _get_float_param(params, "base_factor", float(base_factor))
-        idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0
-        observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
-        if observed_ratio > 0:
-            implied_D = 120 / observed_ratio ** (1 / idle_penalty_power)
-            self.assertAlmostEqualFloat(implied_D, 400.0, tolerance=20.0)
+        # Part 2: Monotonic attenuation validation
+        modes = list(ATTENUATION_MODES) + ["plateau_linear"]
+        base_factor = self.TEST_BASE_FACTOR
+        pnl = 0.05
+        pnl_factor = 1.0
+        for mode in modes:
+            with self.subTest(mode=mode):
+                if mode == "plateau_linear":
+                    mode_params = self.base_params(
+                        exit_attenuation_mode="linear",
+                        exit_plateau=True,
+                        exit_plateau_grace=0.2,
+                        exit_linear_slope=1.0,
+                    )
+                elif mode == "linear":
+                    mode_params = self.base_params(
+                        exit_attenuation_mode="linear", exit_linear_slope=1.2
+                    )
+                elif mode == "power":
+                    mode_params = self.base_params(
+                        exit_attenuation_mode="power", exit_power_tau=0.5
+                    )
+                elif mode == "half_life":
+                    mode_params = self.base_params(
+                        exit_attenuation_mode="half_life", exit_half_life=0.7
+                    )
+                else:
+                    mode_params = self.base_params(exit_attenuation_mode="sqrt")
+
+                ratios = np.linspace(0, 2, 15)
+                values = [
+                    _get_exit_factor(base_factor, pnl, pnl_factor, r, mode_params) for r in ratios
+                ]
+
+                if mode == "plateau_linear":
+                    grace = float(mode_params["exit_plateau_grace"])
+                    filtered = [
+                        (r, v)
+                        for r, v in zip(ratios, values)
+                        if r >= grace - self.TOL_IDENTITY_RELAXED
+                    ]
+                    values_to_check = [v for _, v in filtered]
+                else:
+                    values_to_check = values
+
+                for earlier, later in zip(values_to_check, values_to_check[1:]):
+                    self.assertLessEqual(
+                        later,
+                        earlier + self.TOL_IDENTITY_RELAXED,
+                        f"Non-monotonic attenuation in mode={mode}",
+                    )
 
     def test_exit_factor_threshold_warning_and_non_capping(self):
         """Warning emission without capping when exit_factor_threshold exceeded."""
@@ -388,7 +388,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                 f"Alpha attenuation mismatch tau={tau} alpha={alpha} obs_ratio={observed_ratio} exp_ratio={expected_ratio}",
             )
 
-    def test_extreme_parameter_values(self):
+    def test_reward_calculation_extreme_parameters_stability(self):
+        """Test reward calculation extreme parameters stability."""
         extreme_params = self.base_params(win_reward_factor=1000.0, base_factor=10000.0)
         context = RewardContext(
             pnl=0.05,
@@ -411,6 +412,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         self.assertFinite(br.total, name="breakdown.total")
 
     def test_exit_attenuation_modes_enumeration(self):
+        """Test exit attenuation modes enumeration."""
         modes = ATTENUATION_MODES_WITH_LEGACY
         for mode in modes:
             with self.subTest(mode=mode):
@@ -436,49 +438,6 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
                 self.assertFinite(br.exit_component, name="breakdown.exit_component")
                 self.assertFinite(br.total, name="breakdown.total")
 
-    def test_exit_factor_monotonic_attenuation(self):
-        """For attenuation modes: factor should be non-increasing w.r.t duration_ratio.
-
-        Modes covered: sqrt, linear, power, half_life, plateau+linear (after grace).
-        Legacy is excluded (non-monotonic by design). Plateau+linear includes flat grace then monotonic.
-        """
-        modes = list(ATTENUATION_MODES) + ["plateau_linear"]
-        base_factor = self.TEST_BASE_FACTOR
-        pnl = 0.05
-        pnl_factor = 1.0
-        for mode in modes:
-            if mode == "plateau_linear":
-                params = self.base_params(
-                    exit_attenuation_mode="linear",
-                    exit_plateau=True,
-                    exit_plateau_grace=0.2,
-                    exit_linear_slope=1.0,
-                )
-            elif mode == "linear":
-                params = self.base_params(exit_attenuation_mode="linear", exit_linear_slope=1.2)
-            elif mode == "power":
-                params = self.base_params(exit_attenuation_mode="power", exit_power_tau=0.5)
-            elif mode == "half_life":
-                params = self.base_params(exit_attenuation_mode="half_life", exit_half_life=0.7)
-            else:
-                params = self.base_params(exit_attenuation_mode="sqrt")
-            ratios = np.linspace(0, 2, 15)
-            values = [_get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios]
-            if mode == "plateau_linear":
-                grace = float(params["exit_plateau_grace"])
-                filtered = [
-                    (r, v) for r, v in zip(ratios, values) if r >= grace - self.TOL_IDENTITY_RELAXED
-                ]
-                values_to_check = [v for _, v in filtered]
-            else:
-                values_to_check = values
-            for earlier, later in zip(values_to_check, values_to_check[1:]):
-                self.assertLessEqual(
-                    later,
-                    earlier + self.TOL_IDENTITY_RELAXED,
-                    f"Non-monotonic attenuation in mode={mode}",
-                )
-
     def test_exit_factor_boundary_parameters(self):
         """Test parameter edge cases: tau extremes, plateau grace edges, slope zero."""
         base_factor = 50.0
@@ -572,6 +531,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase):
         self.assertLess(vals[-1], ref, "Attenuation should begin after grace boundary")
 
     def test_plateau_continuity_at_grace_boundary(self):
+        """Test plateau continuity at grace boundary."""
         modes = ["sqrt", "linear", "power", "half_life"]
         grace = 0.8
         eps = self.CONTINUITY_EPS_SMALL
index 76956453fb79478cbf7f1acfd423d759f70a9dd0..837235cbe37ceb66b0207054b6f858c7b4abd2ab 100644 (file)
@@ -124,8 +124,8 @@ class TestStatistics(RewardSpaceTestBase):
                 if key in diagnostics:
                     self.assertFinite(diagnostics[key], name=key)
 
-    def test_statistical_functions(self):
-        """Smoke test statistical_hypothesis_tests on synthetic data (API integration)."""
+    def test_statistical_hypothesis_tests_api_integration(self):
+        """Test statistical_hypothesis_tests API integration with synthetic data."""
         base = self.make_stats_df(n=200, seed=self.SEED, idle_pattern="mixed")
         base.loc[:149, ["reward_idle", "reward_hold", "reward_exit"]] = 0.0
         results = statistical_hypothesis_tests(base)
@@ -388,6 +388,41 @@ class TestStatistics(RewardSpaceTestBase):
                 eff = res["effect_size_epsilon_sq"]
                 self.assertFinite(eff)
                 self.assertGreaterEqual(eff, 0)
+
+    def test_bootstrap_confidence_intervals_bounds_ordering(self):
+        """Test bootstrap confidence intervals return ordered finite bounds."""
+        test_data = self.make_stats_df(n=100, seed=self.SEED)
+        results = bootstrap_confidence_intervals(test_data, ["reward", "pnl"], n_bootstrap=100)
+        for metric, (mean, ci_low, ci_high) in results.items():
+            self.assertFinite(mean, name=f"mean[{metric}]")
+            self.assertFinite(ci_low, name=f"ci_low[{metric}]")
+            self.assertFinite(ci_high, name=f"ci_high[{metric}]")
+            self.assertLess(ci_low, ci_high)
+
+    def test_stats_bootstrap_shrinkage_with_sample_size(self):
+        """Bootstrap CI half-width decreases with larger sample (~1/sqrt(n) heuristic)."""
+        small = self._shift_scale_df(80)
+        large = self._shift_scale_df(800)
+        res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400)
+        res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400)
+        _, lo_s, hi_s = list(res_small.values())[0]
+        _, lo_l, hi_l = list(res_large.values())[0]
+        hw_small = (hi_s - lo_s) / 2.0
+        hw_large = (hi_l - lo_l) / 2.0
+        self.assertFinite(hw_small, name="hw_small")
+        self.assertFinite(hw_large, name="hw_large")
+        self.assertLess(hw_large, hw_small * 0.55)
+
+    def test_stats_bootstrap_constant_distribution_and_diagnostics(self):
+        """Bootstrap on degenerate columns produce (mean≈lo≈hi) zero-width intervals."""
+        df = self._const_df(80)
+        res = bootstrap_confidence_intervals(
+            df, ["reward", "pnl"], n_bootstrap=200, confidence_level=0.95
+        )
+        for _metric, (mean, lo, hi) in res.items():
+            self.assertAlmostEqualFloat(mean, lo, tolerance=2e-09)
+            self.assertAlmostEqualFloat(mean, hi, tolerance=2e-09)
+            self.assertLessEqual(hi - lo, 2e-09)
             if "effect_size_rank_biserial" in res:
                 rb = res["effect_size_rank_biserial"]
                 self.assertFinite(rb)
index 2d1931ccce821dd7e5fe19b7847a71ab8bf04585..c214aced7d817a71726bfcf6a2e472547f58f903 100644 (file)
@@ -10,17 +10,12 @@ import unittest
 import warnings
 from pathlib import Path
 
-import numpy as np
 import pandas as pd
 
 from reward_space_analysis import (
-    DEFAULT_MODEL_REWARD_PARAMETERS,
     PBRS_INVARIANCE_TOL,
-    _get_float_param,
     apply_potential_shaping,
-    bootstrap_confidence_intervals,
     load_real_episodes,
-    simulate_samples,
 )
 
 from .test_base import RewardSpaceTestBase
@@ -34,6 +29,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase):
             pickle.dump(obj, f)
 
     def test_top_level_dict_transitions(self):
+        """Test top level dict transitions."""
         df = pd.DataFrame(
             {
                 "pnl": [0.01],
@@ -52,6 +48,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase):
         self.assertEqual(len(loaded), 1)
 
     def test_mixed_episode_list_warns_and_flattens(self):
+        """Test mixed episode list warns and flattens."""
         ep1 = {"episode_id": 1}
         ep2 = {
             "episode_id": 2,
@@ -76,6 +73,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase):
         self.assertPlacesEqual(float(loaded.iloc[0]["pnl"]), 0.02, places=7)
 
     def test_non_iterable_transitions_raises(self):
+        """Test non iterable transitions raises."""
         bad = {"transitions": 123}
         p = Path(self.temp_dir) / "bad.pkl"
         self.write_pickle(bad, p)
@@ -83,6 +81,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase):
             load_real_episodes(p)
 
     def test_enforce_columns_false_fills_na(self):
+        """Test enforce columns false fills na."""
         trans = [
             {"pnl": 0.03, "trade_duration": 10, "idle_duration": 0, "position": 1.0, "action": 2.0}
         ]
@@ -93,6 +92,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase):
         self.assertTrue(loaded["reward"].isna().all())
 
     def test_casting_numeric_strings(self):
+        """Test casting numeric strings."""
         trans = [
             {
                 "pnl": "0.04",
@@ -130,104 +130,6 @@ class TestLoadRealEpisodes(RewardSpaceTestBase):
         self.assertIn("pnl", loaded_data.columns)
 
 
-class TestBootstrapStatistics(RewardSpaceTestBase):
-    """Grouped tests for bootstrap confidence interval behavior."""
-
-    def test_constant_distribution_bootstrap_and_diagnostics(self):
-        """Degenerate columns produce (mean≈lo≈hi) zero-width intervals."""
-        df = self._const_df(80)
-        res = bootstrap_confidence_intervals(
-            df, ["reward", "pnl"], n_bootstrap=200, confidence_level=0.95
-        )
-        for k, (mean, lo, hi) in res.items():
-            self.assertAlmostEqualFloat(mean, lo, tolerance=2e-09)
-            self.assertAlmostEqualFloat(mean, hi, tolerance=2e-09)
-            self.assertLessEqual(hi - lo, 2e-09)
-
-    def test_bootstrap_shrinkage_with_sample_size(self):
-        """Half-width decreases with larger sample (~1/sqrt(n) heuristic)."""
-        small = self._shift_scale_df(80)
-        large = self._shift_scale_df(800)
-        res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400)
-        res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400)
-        _, lo_s, hi_s = list(res_small.values())[0]
-        _, lo_l, hi_l = list(res_large.values())[0]
-        hw_small = (hi_s - lo_s) / 2.0
-        hw_large = (hi_l - lo_l) / 2.0
-        self.assertFinite(hw_small, name="hw_small")
-        self.assertFinite(hw_large, name="hw_large")
-        self.assertLess(hw_large, hw_small * 0.55)
-
-    def test_bootstrap_confidence_intervals_basic(self):
-        """Basic CI computation returns ordered finite bounds."""
-        test_data = self.make_stats_df(n=100, seed=self.SEED)
-        results = bootstrap_confidence_intervals(test_data, ["reward", "pnl"], n_bootstrap=100)
-        for metric, (mean, ci_low, ci_high) in results.items():
-            self.assertFinite(mean, name=f"mean[{metric}]")
-            self.assertFinite(ci_low, name=f"ci_low[{metric}]")
-            self.assertFinite(ci_high, name=f"ci_high[{metric}]")
-            self.assertLess(ci_low, ci_high)
-
-    def test_canonical_invariance_flag_and_sum(self):
-        """Canonical mode + no additives -> pbrs_invariant True and Σ shaping ≈ 0."""
-        params = self.base_params(
-            exit_potential_mode="canonical",
-            entry_additive_enabled=False,
-            exit_additive_enabled=False,
-            hold_potential_enabled=True,
-        )
-        df = simulate_samples(
-            params={**params, "max_trade_duration_candles": 100},
-            num_samples=400,
-            seed=self.SEED,
-            base_factor=self.TEST_BASE_FACTOR,
-            profit_target=self.TEST_PROFIT_TARGET,
-            risk_reward_ratio=self.TEST_RR,
-            max_duration_ratio=2.0,
-            trading_mode="margin",
-            pnl_base_std=self.TEST_PNL_STD,
-            pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
-        )
-        unique_flags = set(df["pbrs_invariant"].unique().tolist())
-        self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}")
-        total_shaping = float(df["reward_shaping"].sum())
-        self.assertLess(
-            abs(total_shaping),
-            PBRS_INVARIANCE_TOL,
-            f"Canonical invariance violated: Σ shaping = {total_shaping}",
-        )
-
-    def test_non_canonical_flag_false_and_sum_nonzero(self):
-        """Non-canonical exit potential (progressive_release) -> pbrs_invariant False and Σ shaping != 0."""
-        params = self.base_params(
-            exit_potential_mode="progressive_release",
-            exit_potential_decay=0.25,
-            entry_additive_enabled=False,
-            exit_additive_enabled=False,
-            hold_potential_enabled=True,
-        )
-        df = simulate_samples(
-            params={**params, "max_trade_duration_candles": 100},
-            num_samples=400,
-            seed=self.SEED,
-            base_factor=self.TEST_BASE_FACTOR,
-            profit_target=self.TEST_PROFIT_TARGET,
-            risk_reward_ratio=self.TEST_RR,
-            max_duration_ratio=2.0,
-            trading_mode="margin",
-            pnl_base_std=self.TEST_PNL_STD,
-            pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
-        )
-        unique_flags = set(df["pbrs_invariant"].unique().tolist())
-        self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}")
-        total_shaping = float(df["reward_shaping"].sum())
-        self.assertGreater(
-            abs(total_shaping),
-            PBRS_INVARIANCE_TOL * 10,
-            f"Expected non-zero Σ shaping in non-canonical mode (got {total_shaping})",
-        )
-
-
 class TestReportFormatting(RewardSpaceTestBase):
     """Tests for report formatting elements not covered elsewhere."""
 
@@ -255,36 +157,6 @@ class TestReportFormatting(RewardSpaceTestBase):
             "Tolerance constant value should appear, not raw literal",
         )
 
-    def test_pbrs_non_canonical_report_generation(self):
-        """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification."""
-        df = pd.DataFrame(
-            {
-                "reward_shaping": [0.01, -0.002],
-                "reward_entry_additive": [0.0, 0.0],
-                "reward_exit_additive": [0.001, 0.0],
-            }
-        )
-        total_shaping = df["reward_shaping"].sum()
-        self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL)
-        invariance_status = "❌ Non-canonical"
-        section = []
-        section.append("**PBRS Invariance Summary:**\n")
-        section.append("| Field | Value |\n")
-        section.append("|-------|-------|\n")
-        section.append(f"| Invariance | {invariance_status} |\n")
-        section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n")
-        section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n")
-        section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n")
-        section.append(f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n")
-        section.append(f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n")
-        content = "".join(section)
-        self.assertIn("❌ Non-canonical", content)
-        self.assertRegex(content, "Σ Shaping Reward \\| 0\\.008000 \\|")
-        m_abs = re.search("Abs Σ Shaping Reward \\| ([0-9.]+e[+-][0-9]{2}) \\|", content)
-        self.assertIsNotNone(m_abs)
-        if m_abs:
-            self.assertAlmostEqual(abs(total_shaping), float(m_abs.group(1)), places=12)
-
     def test_additive_activation_deterministic_contribution(self):
         """Additives enabled increase total reward; shaping impact limited."""
         base = self.base_params(
@@ -320,117 +192,6 @@ class TestReportFormatting(RewardSpaceTestBase):
         self.assertLess(abs(s1 - s0), 0.2)
         self.assertGreater(t1 - _t0, 0.0, "Total reward should increase with additives present")
 
-    def test_report_cumulative_invariance_aggregation(self):
-        """Canonical telescoping term: small per-step mean drift, bounded increments."""
-        params = self.base_params(
-            hold_potential_enabled=True,
-            entry_additive_enabled=False,
-            exit_additive_enabled=False,
-            exit_potential_mode="canonical",
-        )
-        gamma = _get_float_param(
-            params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
-        )
-        rng = np.random.default_rng(321)
-        last_potential = 0.0
-        telescoping_sum = 0.0
-        max_abs_step = 0.0
-        steps = 0
-        for _ in range(500):
-            is_exit = rng.uniform() < 0.1
-            current_pnl = float(rng.normal(0, 0.05))
-            current_dur = float(rng.uniform(0, 1))
-            next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.05))
-            next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
-            _tot, _shap, next_potential = apply_potential_shaping(
-                base_reward=0.0,
-                current_pnl=current_pnl,
-                current_duration_ratio=current_dur,
-                next_pnl=next_pnl,
-                next_duration_ratio=next_dur,
-                is_exit=is_exit,
-                last_potential=last_potential,
-                params=params,
-            )
-            inc = gamma * next_potential - last_potential
-            telescoping_sum += inc
-            if abs(inc) > max_abs_step:
-                max_abs_step = abs(inc)
-            steps += 1
-            if is_exit:
-                last_potential = 0.0
-            else:
-                last_potential = next_potential
-        mean_drift = telescoping_sum / max(1, steps)
-        self.assertLess(
-            abs(mean_drift),
-            0.02,
-            f"Per-step telescoping drift too large (mean={mean_drift}, steps={steps})",
-        )
-        self.assertLessEqual(
-            max_abs_step,
-            self.PBRS_MAX_ABS_SHAPING,
-            f"Unexpected large telescoping increment (max={max_abs_step})",
-        )
-
-    def test_report_explicit_non_invariance_progressive_release(self):
-        """progressive_release should generally yield non-zero cumulative shaping (release leak)."""
-        params = self.base_params(
-            hold_potential_enabled=True,
-            entry_additive_enabled=False,
-            exit_additive_enabled=False,
-            exit_potential_mode="progressive_release",
-            exit_potential_decay=0.25,
-        )
-        rng = np.random.default_rng(321)
-        last_potential = 0.0
-        shaping_sum = 0.0
-        for _ in range(160):
-            is_exit = rng.uniform() < 0.15
-            next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.07))
-            next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
-            _tot, shap, next_pot = apply_potential_shaping(
-                base_reward=0.0,
-                current_pnl=float(rng.normal(0, 0.07)),
-                current_duration_ratio=float(rng.uniform(0, 1)),
-                next_pnl=next_pnl,
-                next_duration_ratio=next_dur,
-                is_exit=is_exit,
-                last_potential=last_potential,
-                params=params,
-            )
-            shaping_sum += shap
-            last_potential = 0.0 if is_exit else next_pot
-        self.assertGreater(
-            abs(shaping_sum),
-            PBRS_INVARIANCE_TOL * 50,
-            f"Expected non-zero Σ shaping (got {shaping_sum})",
-        )
-
-    def test_gamma_extremes(self):
-        """Gamma=0 and gamma≈1 boundary behaviours produce bounded shaping and finite potentials."""
-        for gamma in [0.0, 0.999999]:
-            params = self.base_params(
-                hold_potential_enabled=True,
-                entry_additive_enabled=False,
-                exit_additive_enabled=False,
-                exit_potential_mode="canonical",
-                potential_gamma=gamma,
-            )
-            _tot, shap, next_pot = apply_potential_shaping(
-                base_reward=0.0,
-                current_pnl=0.02,
-                current_duration_ratio=0.3,
-                next_pnl=0.025,
-                next_duration_ratio=0.35,
-                is_exit=False,
-                last_potential=0.0,
-                params=params,
-            )
-            self.assertTrue(np.isfinite(shap))
-            self.assertTrue(np.isfinite(next_pot))
-            self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING)
-
 
 class TestCsvAndSimulationOptions(RewardSpaceTestBase):
     """CLI-level tests: CSV encoding and simulate_unrealized_pnl option effects."""
@@ -464,43 +225,6 @@ class TestCsvAndSimulationOptions(RewardSpaceTestBase):
         allowed = {0, 1, 2, 3, 4}
         self.assertTrue(set((int(v) for v in values)).issubset(allowed))
 
-    def test_unrealized_pnl_affects_hold_potential(self):
-        """--unrealized_pnl should alter hold next_potential distribution vs default."""
-        out_default = self.output_path / "sim_default"
-        out_sim = self.output_path / "sim_unrealized"
-        base_args = ["--num_samples", "800", "--seed", str(self.SEED), "--out_dir"]
-        cmd_default = [sys.executable, "reward_space_analysis.py", *base_args, str(out_default)]
-        res_def = subprocess.run(
-            cmd_default, capture_output=True, text=True, cwd=Path(__file__).parent.parent
-        )
-        self.assertEqual(res_def.returncode, 0, f"CLI default run failed: {res_def.stderr}")
-        cmd_sim = [
-            sys.executable,
-            "reward_space_analysis.py",
-            *base_args,
-            str(out_sim),
-            "--unrealized_pnl",
-        ]
-        res_sim = subprocess.run(
-            cmd_sim, capture_output=True, text=True, cwd=Path(__file__).parent.parent
-        )
-        self.assertEqual(res_sim.returncode, 0, f"CLI simulated run failed: {res_sim.stderr}")
-        df_def = pd.read_csv(out_default / "reward_samples.csv")
-        df_sim = pd.read_csv(out_sim / "reward_samples.csv")
-        mask_hold_def = (df_def["action"] == 0) & df_def["position"].isin([0.0, 1.0])
-        mask_hold_sim = (df_sim["action"] == 0) & df_sim["position"].isin([0.0, 1.0])
-        self.assertGreater(int(mask_hold_def.sum()), 0, "No hold samples in default run")
-        self.assertGreater(int(mask_hold_sim.sum()), 0, "No hold samples in simulate run")
-        mean_next_def = float(df_def.loc[mask_hold_def, "next_potential"].mean())
-        mean_next_sim = float(df_sim.loc[mask_hold_sim, "next_potential"].mean())
-        self.assertFinite(mean_next_def, name="mean_next_def")
-        self.assertFinite(mean_next_sim, name="mean_next_sim")
-        self.assertGreater(
-            abs(mean_next_sim - mean_next_def),
-            self.TOL_GENERIC_EQ,
-            f"No detectable effect of --unrealized_pnl on Φ(s): def={mean_next_def:.6f}, sim={mean_next_sim:.6f}",
-        )
-
 
 class TestParamsPropagation(RewardSpaceTestBase):
     """Integration tests to validate max_trade_duration_candles propagation via CLI params and dynamic flag."""
index cffb07d83e979990cd3b39f6dddcd2bf7e1d13d5..6e97559828f8971f34aabc1ee8404d7d84ca3531 100644 (file)
@@ -131,6 +131,11 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/ec/16/114df1c291c22cac3b0c127a73e0af5c12ed7bbb6558d310429a0ae24023/coverage-7.10.7-py3-none-any.whl", hash = "sha256:f7941f6f2fe6dd6807a1208737b8a0cbcf1cc6d7b07d24998ad2d63590868260", size = 209952, upload-time = "2025-09-21T20:03:53.918Z" },
 ]
 
+[package.optional-dependencies]
+toml = [
+    { name = "tomli", marker = "python_full_version < '3.10'" },
+]
+
 [[package]]
 name = "coverage"
 version = "7.11.0"
@@ -235,6 +240,11 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/5f/04/642c1d8a448ae5ea1369eac8495740a79eb4e581a9fb0cbdce56bbf56da1/coverage-7.11.0-py3-none-any.whl", hash = "sha256:4b7589765348d78fb4e5fb6ea35d07564e387da2fc5efff62e0222971f155f68", size = 207761, upload-time = "2025-10-15T15:15:06.439Z" },
 ]
 
+[package.optional-dependencies]
+toml = [
+    { name = "tomli", marker = "python_full_version >= '3.10' and python_full_version <= '3.11'" },
+]
+
 [[package]]
 name = "exceptiongroup"
 version = "1.3.0"
@@ -603,6 +613,21 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" },
 ]
 
+[[package]]
+name = "pytest-cov"
+version = "7.0.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+    { name = "coverage", version = "7.10.7", source = { registry = "https://pypi.org/simple" }, extra = ["toml"], marker = "python_full_version < '3.10'" },
+    { name = "coverage", version = "7.11.0", source = { registry = "https://pypi.org/simple" }, extra = ["toml"], marker = "python_full_version >= '3.10'" },
+    { name = "pluggy" },
+    { name = "pytest" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/5e/f7/c933acc76f5208b3b00089573cf6a2bc26dc80a8aece8f52bb7d6b1855ca/pytest_cov-7.0.0.tar.gz", hash = "sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1", size = 54328, upload-time = "2025-09-09T10:57:02.113Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
+]
+
 [[package]]
 name = "python-dateutil"
 version = "2.9.0.post0"
@@ -646,6 +671,7 @@ dev = [
     { name = "coverage", version = "7.10.7", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
     { name = "coverage", version = "7.11.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" },
     { name = "pytest" },
+    { name = "pytest-cov" },
     { name = "ruff" },
 ]
 
@@ -662,6 +688,7 @@ requires-dist = [
 dev = [
     { name = "coverage" },
     { name = "pytest", specifier = ">=6.0" },
+    { name = "pytest-cov", specifier = ">=7.0.0" },
     { name = "ruff" },
 ]