]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
test(reward): add parameter validation, legacy step, piecewise slope=0, non-negative...
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 Oct 2025 12:37:55 +0000 (14:37 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 Oct 2025 12:37:55 +0000 (14:37 +0200)
ReforceXY/reward_space_analysis/test_reward_space_analysis.py

index 8ffa10b7f745916e0ae7721a1c587972edb85b79..5e61c742a9664c2960cf302698015e5e48c0fb5a 100644 (file)
@@ -7,7 +7,9 @@ Covers:
 - Reward alignment with environment
 """
 
+import dataclasses
 import json
+import math
 import pickle
 import shutil
 import subprocess
@@ -60,8 +62,26 @@ class RewardSpaceTestBase(unittest.TestCase):
         """Clean up temporary files."""
         shutil.rmtree(self.temp_dir, ignore_errors=True)
 
-    def assertAlmostEqualFloat(self, first, second, tolerance=1e-6, msg=None):
-        """Helper for floating point comparisons."""
+    def assertAlmostEqualFloat(
+        self,
+        first: float,
+        second: float,
+        tolerance: float = 1e-6,
+        msg: str | None = None,
+    ) -> None:
+        """Helper for floating point comparisons with explicit type hints.
+
+        Parameters
+        ----------
+        first : float
+            First value to compare.
+        second : float
+            Second value to compare.
+        tolerance : float, default 1e-6
+            Absolute tolerance allowed between the two values.
+        msg : str | None
+            Optional message to display on failure.
+        """
         if abs(first - second) > tolerance:
             self.fail(f"{first} != {second} within {tolerance}: {msg}")
 
@@ -318,58 +338,298 @@ class TestRewardAlignment(RewardSpaceTestBase):
         )
 
     def test_force_action_logic(self):
-        """Test force action behavior consistency."""
-        # Take profit should generally be positive
+        """Validate forced exits (take profit, stop loss, timeout) produce consistent exit rewards.
+
+        Algorithmic expectations:
+        - ForceActions override the provided action and trigger exit reward path.
+        - Exit reward sign should match PnL sign (exit_factor is positive under invariants).
+        - Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|.
+        - Timeout uses current PnL (can be positive or negative); we assert sign consistency only.
+        """
+        base_factor = 100.0
+        profit_target = 0.06
+
+        # Take profit (positive pnl)
         tp_context = RewardContext(
-            pnl=0.05,  # Good profit
-            trade_duration=20,
+            pnl=0.05,
+            trade_duration=50,
             idle_duration=0,
             max_trade_duration=100,
-            max_unrealized_profit=0.06,
-            min_unrealized_profit=0.04,
+            max_unrealized_profit=0.07,
+            min_unrealized_profit=0.01,
             position=Positions.Long,
-            action=Actions.Long_exit,
+            action=Actions.Neutral,  # action ignored due to force_action
             force_action=ForceActions.Take_profit,
         )
-
         tp_breakdown = calculate_reward(
             tp_context,
             self.DEFAULT_PARAMS,
-            base_factor=100.0,
-            profit_target=0.06,
+            base_factor=base_factor,
+            profit_target=profit_target,
             risk_reward_ratio=2.0,
             short_allowed=True,
             action_masking=True,
         )
+        self.assertGreater(
+            tp_breakdown.exit_component,
+            0.0,
+            "Take profit should yield positive exit reward",
+        )
+        # Exit reward should be the only active component
+        self.assertEqual(tp_breakdown.invalid_penalty, 0.0)
+        self.assertEqual(tp_breakdown.idle_penalty, 0.0)
+        self.assertEqual(tp_breakdown.holding_penalty, 0.0)
+        self.assertEqual(tp_breakdown.total, tp_breakdown.exit_component)
+        self.assertAlmostEqualFloat(
+            math.copysign(1, tp_breakdown.exit_component),
+            1.0,
+            msg="TP reward sign mismatch",
+        )
 
-        # Stop loss should generally be negative
+        # Stop loss (negative pnl)
         sl_context = RewardContext(
-            pnl=-0.03,  # Loss
-            trade_duration=15,
+            pnl=-0.03,
+            trade_duration=50,
             idle_duration=0,
             max_trade_duration=100,
             max_unrealized_profit=0.01,
-            min_unrealized_profit=-0.04,
+            min_unrealized_profit=-0.05,
             position=Positions.Long,
-            action=Actions.Long_exit,
+            action=Actions.Neutral,
             force_action=ForceActions.Stop_loss,
         )
-
         sl_breakdown = calculate_reward(
             sl_context,
             self.DEFAULT_PARAMS,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=2.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertLess(
+            sl_breakdown.exit_component,
+            0.0,
+            "Stop loss should yield negative exit reward",
+        )
+        self.assertEqual(sl_breakdown.invalid_penalty, 0.0)
+        self.assertEqual(sl_breakdown.idle_penalty, 0.0)
+        self.assertEqual(sl_breakdown.holding_penalty, 0.0)
+        self.assertEqual(sl_breakdown.total, sl_breakdown.exit_component)
+        self.assertAlmostEqualFloat(
+            math.copysign(1, sl_breakdown.exit_component),
+            -1.0,
+            msg="SL reward sign mismatch",
+        )
+
+        # Timeout (use small positive pnl)
+        to_context = RewardContext(
+            pnl=0.01,
+            trade_duration=120,  # beyond default max
+            idle_duration=0,
+            max_trade_duration=100,
+            max_unrealized_profit=0.02,
+            min_unrealized_profit=-0.01,
+            position=Positions.Long,
+            action=Actions.Neutral,
+            force_action=ForceActions.Timeout,
+        )
+        to_breakdown = calculate_reward(
+            to_context,
+            self.DEFAULT_PARAMS,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=2.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertGreaterEqual(
+            to_breakdown.exit_component,
+            0.0,
+            "Timeout reward should be non-negative with positive PnL",
+        )
+        self.assertEqual(to_breakdown.invalid_penalty, 0.0)
+        self.assertEqual(to_breakdown.idle_penalty, 0.0)
+        self.assertEqual(to_breakdown.holding_penalty, 0.0)
+        self.assertEqual(to_breakdown.total, to_breakdown.exit_component)
+
+        # Magnitude ordering: TP reward magnitude > SL reward magnitude (absolute values, given larger |pnl| for TP)
+        self.assertGreater(
+            abs(tp_breakdown.exit_component),
+            abs(sl_breakdown.exit_component),
+            "Take profit reward magnitude should exceed stop loss reward magnitude",
+        )
+
+    def test_max_idle_duration_candles_logic(self):
+        """Idle penalty scaling test with explicit max_idle_duration_candles (no force-action comparisons)."""
+        params_small = self.DEFAULT_PARAMS.copy()
+        params_large = self.DEFAULT_PARAMS.copy()
+        # Activate explicit max idle durations
+        params_small["max_idle_duration_candles"] = 50
+        params_large["max_idle_duration_candles"] = 200
+
+        base_factor = 100.0
+        idle_duration = 40  # below large threshold, near small threshold
+        context = RewardContext(
+            pnl=0.0,
+            trade_duration=0,
+            idle_duration=idle_duration,
+            max_trade_duration=128,
+            max_unrealized_profit=0.0,
+            min_unrealized_profit=0.0,
+            position=Positions.Neutral,
+            action=Actions.Neutral,
+            force_action=None,
+        )
+
+        breakdown_small = calculate_reward(
+            context,
+            params_small,
+            base_factor,
+            profit_target=0.03,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        breakdown_large = calculate_reward(
+            context,
+            params_large,
+            base_factor,
+            profit_target=0.03,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+
+        self.assertLess(breakdown_small.idle_penalty, 0.0)
+        self.assertLess(breakdown_large.idle_penalty, 0.0)
+        # Because denominator is larger, absolute penalty (negative) should be smaller in magnitude
+        self.assertGreater(
+            breakdown_large.idle_penalty,
+            breakdown_small.idle_penalty,
+            f"Expected less severe penalty with larger max_idle_duration_candles (large={breakdown_large.idle_penalty}, small={breakdown_small.idle_penalty})",
+        )
+
+    def test_idle_penalty_fallback_and_proportionality(self):
+        """When max_idle_duration_candles <= 0, fallback to max_trade_duration and ensure proportional scaling.
+
+        Also validates that penalty doubles (approximately) when idle_duration doubles (holding other params constant).
+        """
+        params = self.DEFAULT_PARAMS.copy()
+        params["max_idle_duration_candles"] = 0  # force fallback
+        base_factor = 90.0
+        profit_target = 0.03
+        risk_reward_ratio = 1.0
+
+        # Two contexts with different idle durations
+        ctx_a = RewardContext(
+            pnl=0.0,
+            trade_duration=0,
+            idle_duration=20,
+            max_trade_duration=100,
+            max_unrealized_profit=0.0,
+            min_unrealized_profit=0.0,
+            position=Positions.Neutral,
+            action=Actions.Neutral,
+            force_action=None,
+        )
+        ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
+
+        br_a = calculate_reward(
+            ctx_a,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=risk_reward_ratio,
+            short_allowed=True,
+            action_masking=True,
+        )
+        br_b = calculate_reward(
+            ctx_b,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=risk_reward_ratio,
+            short_allowed=True,
+            action_masking=True,
+        )
+
+        self.assertLess(br_a.idle_penalty, 0.0)
+        self.assertLess(br_b.idle_penalty, 0.0)
+
+        # Ratio of penalties should be close to 2 (linear power=1 scaling) allowing small numerical tolerance
+        ratio = (
+            br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None
+        )
+        self.assertIsNotNone(ratio, "Idle penalty A should not be zero")
+        # Both penalties are negative, ratio should be ~ (40/20) = 2, hence ratio ~ 2
+        self.assertAlmostEqualFloat(
+            abs(ratio),
+            2.0,
+            tolerance=0.2,
+            msg=f"Idle penalty proportionality mismatch (ratio={ratio})",
+        )
+
+    def test_exit_factor_threshold_warning_non_capping(self):
+        """Ensure exit_factor_threshold does not cap the exit factor (warning-only semantics).
+
+        We approximate by computing two rewards: a baseline and an amplified one (via larger base_factor & pnl) and
+        ensure the amplified reward scales proportionally beyond the threshold rather than flattening.
+        """
+        params = self.DEFAULT_PARAMS.copy()
+        # Remove base_factor from params so that the function uses the provided argument (makes scaling observable)
+        params.pop("base_factor", None)
+        threshold = float(params.get("exit_factor_threshold", 10_000.0))
+
+        context = RewardContext(
+            pnl=0.08,  # above typical profit_target * RR
+            trade_duration=10,
+            idle_duration=0,
+            max_trade_duration=100,
+            max_unrealized_profit=0.09,
+            min_unrealized_profit=0.0,
+            position=Positions.Long,
+            action=Actions.Long_exit,
+            force_action=None,
+        )
+
+        # Baseline with moderate base_factor
+        baseline = calculate_reward(
+            context,
+            params,
             base_factor=100.0,
-            profit_target=0.06,
+            profit_target=0.03,
             risk_reward_ratio=2.0,
             short_allowed=True,
             action_masking=True,
         )
 
-        # Take profit should be better than stop loss
+        # Amplified: choose a much larger base_factor (ensure > threshold relative scale)
+        amplified_base_factor = max(
+            100.0 * 50, threshold * 2.0 / max(context.pnl, 1e-9)
+        )
+        amplified = calculate_reward(
+            context,
+            params,
+            base_factor=amplified_base_factor,
+            profit_target=0.03,
+            risk_reward_ratio=2.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+
+        scale_observed = (
+            amplified.exit_component / baseline.exit_component
+            if baseline.exit_component
+            else 0.0
+        )
+        self.assertGreater(baseline.exit_component, 0.0)
+        self.assertGreater(amplified.exit_component, baseline.exit_component)
+        # Expect at least ~10x increase (safe margin) to confirm absence of clipping
         self.assertGreater(
-            tp_breakdown.total,
-            sl_breakdown.total,
-            "Take profit should yield higher reward than stop loss",
+            scale_observed,
+            10.0,
+            f"Amplified reward did not scale sufficiently (factor={scale_observed:.2f}, amplified_base_factor={amplified_base_factor})",
         )
 
     def test_exit_factor_calculation(self):
@@ -667,7 +927,6 @@ class TestStatisticalValidation(RewardSpaceTestBase):
 
     def test_exit_factor_mathematical_formulas(self):
         """Test mathematical correctness of exit factor calculations."""
-        import math
 
         from reward_space_analysis import (
             Actions,
@@ -701,8 +960,7 @@ class TestStatisticalValidation(RewardSpaceTestBase):
         )
 
         # Mathematical validation: alpha = -ln(tau) = -ln(0.5) ≈ 0.693
-        expected_alpha = -math.log(0.5)
-        expected_attenuation = 1.0 / (1.0 + expected_alpha * duration_ratio)
+        # Analytical alpha value (unused directly now, kept for clarity): -ln(0.5) ≈ 0.693
 
         # The reward should be attenuated by this factor
         self.assertGreater(
@@ -1031,6 +1289,55 @@ class TestBoundaryConditions(RewardSpaceTestBase):
                     f"Total reward should be finite for mode {mode}",
                 )
 
+    def test_unknown_exit_factor_mode_fallback_piecewise(self):
+        """Unknown exit_factor_mode must fallback to piecewise attenuation."""
+        base_factor = 150.0
+        context = RewardContext(
+            pnl=0.05,
+            trade_duration=160,
+            idle_duration=0,
+            max_trade_duration=100,
+            max_unrealized_profit=0.07,
+            min_unrealized_profit=0.0,
+            position=Positions.Long,
+            action=Actions.Long_exit,
+            force_action=None,
+        )
+        params_piecewise = self.DEFAULT_PARAMS.copy()
+        params_piecewise["exit_factor_mode"] = "piecewise"
+        params_unknown = self.DEFAULT_PARAMS.copy()
+        params_unknown["exit_factor_mode"] = "unrecognized_mode_xyz"
+
+        reward_piecewise = calculate_reward(
+            context,
+            params_piecewise,
+            base_factor=base_factor,
+            profit_target=0.03,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        reward_unknown = calculate_reward(
+            context,
+            params_unknown,
+            base_factor=base_factor,
+            profit_target=0.03,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertGreater(
+            reward_piecewise.exit_component,
+            0.0,
+            "Piecewise exit reward should be positive with positive pnl",
+        )
+        self.assertAlmostEqualFloat(
+            reward_piecewise.exit_component,
+            reward_unknown.exit_component,
+            tolerance=1e-9,
+            msg="Fallback for unknown mode should produce identical result to piecewise",
+        )
+
 
 class TestHelperFunctions(RewardSpaceTestBase):
     """Test utility and helper functions."""
@@ -1545,6 +1852,505 @@ class TestPrivateFunctions(RewardSpaceTestBase):
                 f"Penalty should increase with duration: {penalties[i]} > {penalties[i-1]}",
             )
 
+    def test_new_invariant_and_warn_parameters(self):
+        """Ensure new tunables (check_invariants, exit_factor_threshold) exist and behave.
+
+        Uses a very large base_factor to trigger potential warning condition without capping.
+        """
+        params = self.DEFAULT_PARAMS.copy()
+        self.assertIn("check_invariants", params)
+        self.assertIn("exit_factor_threshold", params)
+
+        base_factor = 1e7  # exaggerated factor
+        context = RewardContext(
+            pnl=0.05,
+            trade_duration=300,
+            idle_duration=0,
+            max_trade_duration=100,
+            max_unrealized_profit=0.06,
+            min_unrealized_profit=0.0,
+            position=Positions.Long,
+            action=Actions.Long_exit,
+            force_action=None,
+        )
+        breakdown = calculate_reward(
+            context,
+            params,
+            base_factor=base_factor,
+            profit_target=0.03,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        self.assertTrue(
+            np.isfinite(breakdown.exit_component), "Exit component must be finite"
+        )
+
+
+class TestRewardRobustness(RewardSpaceTestBase):
+    """Tests implementing all prioritized robustness enhancements.
+
+    Covers:
+    - Reward decomposition integrity (total == sum of active component exactly)
+    - Exit factor monotonic attenuation per mode where mathematically expected
+    - Boundary parameter conditions (tau extremes, piecewise grace edges, linear slope = 0)
+    - Non-linear power tests for idle & holding penalties (power != 1)
+    - Public wrapper `compute_exit_factor` (avoids private function usage in new tests)
+    - Warning emission (exit_factor_threshold) without capping
+    """
+
+    def _mk_context(
+        self,
+        pnl: float = 0.04,
+        trade_duration: int = 40,
+        idle_duration: int = 0,
+        max_trade_duration: int = 100,
+        max_unrealized_profit: float = 0.05,
+        min_unrealized_profit: float = 0.01,
+        position: Positions = Positions.Long,
+        action: Actions = Actions.Long_exit,
+        force_action: ForceActions | None = None,
+    ) -> RewardContext:
+        return RewardContext(
+            pnl=pnl,
+            trade_duration=trade_duration,
+            idle_duration=idle_duration,
+            max_trade_duration=max_trade_duration,
+            max_unrealized_profit=max_unrealized_profit,
+            min_unrealized_profit=min_unrealized_profit,
+            position=position,
+            action=action,
+            force_action=force_action,
+        )
+
+    def test_decomposition_integrity(self):
+        """Assert reward_total equals exactly the single active component (mutual exclusivity).
+
+        We sample a grid of mutually exclusive scenarios and validate decomposition.
+        """
+        scenarios = [
+            # Idle penalty only
+            dict(
+                ctx=RewardContext(
+                    pnl=0.0,
+                    trade_duration=0,
+                    idle_duration=25,
+                    max_trade_duration=100,
+                    max_unrealized_profit=0.0,
+                    min_unrealized_profit=0.0,
+                    position=Positions.Neutral,
+                    action=Actions.Neutral,
+                    force_action=None,
+                ),
+                active="idle_penalty",
+            ),
+            # Holding penalty only
+            dict(
+                ctx=RewardContext(
+                    pnl=0.0,
+                    trade_duration=150,
+                    idle_duration=0,
+                    max_trade_duration=100,
+                    max_unrealized_profit=0.0,
+                    min_unrealized_profit=0.0,
+                    position=Positions.Long,
+                    action=Actions.Neutral,
+                    force_action=None,
+                ),
+                active="holding_penalty",
+            ),
+            # Exit reward only (positive pnl)
+            dict(
+                ctx=self._mk_context(pnl=0.03, trade_duration=60),
+                active="exit_component",
+            ),
+            # Invalid action only
+            dict(
+                ctx=RewardContext(
+                    pnl=0.01,
+                    trade_duration=10,
+                    idle_duration=0,
+                    max_trade_duration=100,
+                    max_unrealized_profit=0.02,
+                    min_unrealized_profit=0.0,
+                    position=Positions.Short,
+                    action=Actions.Long_exit,  # invalid
+                    force_action=None,
+                ),
+                active="invalid_penalty",
+            ),
+        ]
+        for sc in scenarios:  # type: ignore[var-annotated]
+            ctx_obj: RewardContext = sc["ctx"]  # type: ignore[index]
+            active_label: str = sc["active"]  # type: ignore[index]
+            with self.subTest(active=active_label):
+                br = calculate_reward(
+                    ctx_obj,
+                    self.DEFAULT_PARAMS,
+                    base_factor=100.0,
+                    profit_target=0.03,
+                    risk_reward_ratio=1.0,
+                    short_allowed=True,
+                    action_masking=(active_label != "invalid_penalty"),
+                )
+                components = [
+                    br.invalid_penalty,
+                    br.idle_penalty,
+                    br.holding_penalty,
+                    br.exit_component,
+                ]
+                non_zero = [
+                    c for c in components if not math.isclose(c, 0.0, abs_tol=1e-12)
+                ]
+                self.assertEqual(
+                    len(non_zero),
+                    1,
+                    f"Exactly one component must be active for {sc['active']}",
+                )
+                self.assertAlmostEqualFloat(
+                    br.total,
+                    non_zero[0],
+                    tolerance=1e-9,
+                    msg=f"Total mismatch for {sc['active']}",
+                )
+
+    def test_exit_factor_monotonic_attenuation(self):
+        """For attenuation modes: factor should be non-increasing w.r.t duration_ratio.
+
+        Modes covered: sqrt, linear, power, half_life, piecewise (after grace).
+        Legacy is excluded (non-monotonic by design: step change). Piecewise includes flat grace then monotonic.
+        """
+        from reward_space_analysis import compute_exit_factor
+
+        modes = ["sqrt", "linear", "power", "half_life", "piecewise"]
+        base_factor = 100.0
+        pnl = 0.05
+        pnl_factor = 1.0
+        for mode in modes:
+            params = self.DEFAULT_PARAMS.copy()
+            params["exit_factor_mode"] = mode
+            if mode == "linear":
+                params["exit_linear_slope"] = 1.2
+            if mode == "piecewise":
+                params["exit_piecewise_grace"] = 0.2
+                params["exit_piecewise_slope"] = 1.0
+            if mode == "power":
+                params["exit_power_tau"] = 0.5
+            if mode == "half_life":
+                params["exit_half_life"] = 0.7
+
+            ratios = np.linspace(0, 2, 15)
+            values = [
+                compute_exit_factor(base_factor, pnl, pnl_factor, r, params)
+                for r in ratios
+            ]
+            # Piecewise: ignore initial flat region when checking monotonic decrease
+            if mode == "piecewise":
+                grace = float(params["exit_piecewise_grace"])  # type: ignore[index]
+                filtered = [(r, v) for r, v in zip(ratios, values) if r >= grace - 1e-9]
+                values_to_check = [v for _, v in filtered]
+            else:
+                values_to_check = values
+            for earlier, later in zip(values_to_check, values_to_check[1:]):
+                self.assertLessEqual(
+                    later,
+                    earlier + 1e-9,
+                    f"Non-monotonic attenuation in mode={mode}",
+                )
+
+    def test_exit_factor_boundary_parameters(self):
+        """Test parameter edge cases: tau extremes, grace edges, slope zero."""
+        from reward_space_analysis import compute_exit_factor
+
+        base_factor = 50.0
+        pnl = 0.02
+        pnl_factor = 1.0
+        # Tau near 1 (minimal attenuation) vs tau near 0 (strong attenuation)
+        params_hi = self.DEFAULT_PARAMS.copy()
+        params_hi.update({"exit_factor_mode": "power", "exit_power_tau": 0.999999})
+        params_lo = self.DEFAULT_PARAMS.copy()
+        params_lo.update({"exit_factor_mode": "power", "exit_power_tau": 1e-6})
+        r = 1.5
+        hi_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_hi)
+        lo_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_lo)
+        self.assertGreater(
+            hi_val,
+            lo_val,
+            "Power mode: higher tau (≈1) should attenuate less than tiny tau",
+        )
+        # Piecewise grace 0 vs 1
+        params_g0 = self.DEFAULT_PARAMS.copy()
+        params_g0.update(
+            {
+                "exit_factor_mode": "piecewise",
+                "exit_piecewise_grace": 0.0,
+                "exit_piecewise_slope": 1.0,
+            }
+        )
+        params_g1 = self.DEFAULT_PARAMS.copy()
+        params_g1.update(
+            {
+                "exit_factor_mode": "piecewise",
+                "exit_piecewise_grace": 1.0,
+                "exit_piecewise_slope": 1.0,
+            }
+        )
+        val_g0 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0)
+        val_g1 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1)
+        # With grace=1.0 no attenuation up to 1.0 ratio → value should be higher
+        self.assertGreater(
+            val_g1,
+            val_g0,
+            "Piecewise grace=1.0 should delay attenuation vs grace=0.0",
+        )
+        # Linear slope zero vs positive
+        params_lin0 = self.DEFAULT_PARAMS.copy()
+        params_lin0.update({"exit_factor_mode": "linear", "exit_linear_slope": 0.0})
+        params_lin1 = self.DEFAULT_PARAMS.copy()
+        params_lin1.update({"exit_factor_mode": "linear", "exit_linear_slope": 2.0})
+        val_lin0 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0)
+        val_lin1 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1)
+        self.assertGreater(
+            val_lin0,
+            val_lin1,
+            "Linear slope=0 should yield no attenuation vs slope>0",
+        )
+
+    def test_piecewise_slope_zero_constant_after_grace(self):
+        """Piecewise slope=0 should yield flat factor after grace boundary."""
+        from reward_space_analysis import compute_exit_factor
+        params = self.DEFAULT_PARAMS.copy()
+        params.update(
+            {
+                "exit_factor_mode": "piecewise",
+                "exit_piecewise_grace": 0.3,
+                "exit_piecewise_slope": 0.0,
+            }
+        )
+        base_factor = 100.0
+        pnl = 0.04
+        pnl_factor = 1.2
+        ratios = [0.3, 0.6, 1.0, 1.4]
+        values = [
+            compute_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios
+        ]
+        # All factors should be (approximately) identical after grace (no attenuation)
+        first = values[0]
+        for v in values[1:]:
+            self.assertAlmostEqualFloat(
+                v,
+                first,
+                tolerance=1e-9,
+                msg=f"Piecewise slope=0 factor drift at ratio set {ratios} => {values}",
+            )
+
+    def test_legacy_step_non_monotonic(self):
+        """Legacy mode applies step change at duration_ratio=1 (should not be monotonic)."""
+        from reward_space_analysis import compute_exit_factor
+        params = self.DEFAULT_PARAMS.copy()
+        params["exit_factor_mode"] = "legacy"
+        base_factor = 100.0
+        pnl = 0.02
+        pnl_factor = 1.0
+        # ratio below 1 vs above 1
+        below = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params)
+        above = compute_exit_factor(base_factor, pnl, pnl_factor, 1.5, params)
+        # Legacy multiplies by 1.5 then 0.5 -> below should be > above * 2 (since (1.5)/(0.5)=3)
+        self.assertGreater(
+            below, above, "Legacy pre-threshold factor should exceed post-threshold"
+        )
+        ratio = below / max(above, 1e-12)
+        self.assertGreater(
+            ratio,
+            2.5,
+            f"Legacy step ratio unexpectedly small (expected ~3, got {ratio:.3f})",
+        )
+
+    def test_exit_factor_non_negative_with_positive_pnl(self):
+        """Exit factor must not be negative when pnl >= 0 (invariant clamp)."""
+        from reward_space_analysis import compute_exit_factor
+        params = self.DEFAULT_PARAMS.copy()
+        # Try multiple modes / extreme params
+        modes = ["linear", "power", "piecewise", "half_life", "sqrt", "legacy"]
+        base_factor = 100.0
+        pnl = 0.05
+        pnl_factor = 2.0  # amplified
+        for mode in modes:
+            params_mode = params.copy()
+            params_mode["exit_factor_mode"] = mode
+            val = compute_exit_factor(base_factor, pnl, pnl_factor, 2.0, params_mode)
+            self.assertGreaterEqual(
+                val, 0.0, f"Exit factor should be >=0 for non-negative pnl in mode {mode}"
+            )
+
+
+class TestParameterValidation(RewardSpaceTestBase):
+    """Tests for validate_reward_parameters adjustments and reasons."""
+
+    def test_validate_reward_parameters_adjustments(self):
+        from reward_space_analysis import validate_reward_parameters
+
+        raw = self.DEFAULT_PARAMS.copy()
+        # Introduce out-of-bound values
+        raw["idle_penalty_scale"] = -5.0  # < min 0
+        raw["efficiency_center"] = 2.0  # > max 1
+        raw["exit_power_tau"] = 0.0  # below min (treated as 1e-6)
+        sanitized, adjustments = validate_reward_parameters(raw)
+        # Idle penalty scale should be clamped to 0.0
+        self.assertEqual(sanitized["idle_penalty_scale"], 0.0)
+        self.assertIn("idle_penalty_scale", adjustments)
+        self.assertIsInstance(adjustments["idle_penalty_scale"]["reason"], str)
+        self.assertIn("min=0.0", str(adjustments["idle_penalty_scale"]["reason"]))
+        # Efficiency center should be clamped to 1.0
+        self.assertEqual(sanitized["efficiency_center"], 1.0)
+        self.assertIn("efficiency_center", adjustments)
+        self.assertIn("max=1.0", str(adjustments["efficiency_center"]["reason"]))
+        # exit_power_tau should be raised to lower bound (>=1e-6)
+        self.assertGreaterEqual(float(sanitized["exit_power_tau"]), 1e-6)
+        self.assertIn("exit_power_tau", adjustments)
+        self.assertIn("min=", str(adjustments["exit_power_tau"]["reason"]))
+
+    def test_idle_and_holding_penalty_power(self):
+        """Test non-linear scaling when penalty powers != 1."""
+        params = self.DEFAULT_PARAMS.copy()
+        params["idle_penalty_power"] = 2.0
+        params["max_idle_duration_candles"] = 100
+        base_factor = 90.0
+        profit_target = 0.03
+        # Idle penalties for durations 20 vs 40 (quadratic → (40/100)^2 / (20/100)^2 = (0.4^2)/(0.2^2)=4)
+        ctx_a = RewardContext(
+            pnl=0.0,
+            trade_duration=0,
+            idle_duration=20,
+            max_trade_duration=128,
+            max_unrealized_profit=0.0,
+            min_unrealized_profit=0.0,
+            position=Positions.Neutral,
+            action=Actions.Neutral,
+            force_action=None,
+        )
+        ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
+        br_a = calculate_reward(
+            ctx_a,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        br_b = calculate_reward(
+            ctx_b,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        ratio_quadratic = 0.0
+        if br_a.idle_penalty != 0:
+            ratio_quadratic = br_b.idle_penalty / br_a.idle_penalty
+        # Both negative; absolute ratio should be close to 4
+        self.assertAlmostEqual(
+            abs(ratio_quadratic),
+            4.0,
+            delta=0.8,
+            msg=f"Idle penalty quadratic scaling mismatch (ratio={ratio_quadratic})",
+        )
+        # Holding penalty with power 2: durations just above threshold
+        params["holding_penalty_power"] = 2.0
+        ctx_h1 = RewardContext(
+            pnl=0.0,
+            trade_duration=130,
+            idle_duration=0,
+            max_trade_duration=100,
+            max_unrealized_profit=0.0,
+            min_unrealized_profit=0.0,
+            position=Positions.Long,
+            action=Actions.Neutral,
+            force_action=None,
+        )
+        ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140)
+        br_h1 = calculate_reward(
+            ctx_h1,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        br_h2 = calculate_reward(
+            ctx_h2,
+            params,
+            base_factor=base_factor,
+            profit_target=profit_target,
+            risk_reward_ratio=1.0,
+            short_allowed=True,
+            action_masking=True,
+        )
+        # Quadratic scaling: ((140-100)/(130-100))^2 = (40/30)^2 ≈ 1.777...
+        hold_ratio = 0.0
+        if br_h1.holding_penalty != 0:
+            hold_ratio = br_h2.holding_penalty / br_h1.holding_penalty
+        self.assertAlmostEqual(
+            abs(hold_ratio),
+            (40 / 30) ** 2,
+            delta=0.4,
+            msg=f"Holding penalty quadratic scaling mismatch (ratio={hold_ratio})",
+        )
+
+    def test_exit_factor_threshold_warning_emission(self):
+        """Ensure a RuntimeWarning is emitted when exit_factor exceeds threshold (no capping)."""
+        import warnings as _warnings
+
+        params = self.DEFAULT_PARAMS.copy()
+        params["exit_factor_threshold"] = 10.0  # low threshold to trigger easily
+        # Remove base_factor to allow argument override
+        params.pop("base_factor", None)
+        context = self._mk_context(pnl=0.06, trade_duration=10)
+        with _warnings.catch_warnings(record=True) as w:
+            _warnings.simplefilter("always")
+            br = calculate_reward(
+                context,
+                params,
+                base_factor=5000.0,  # large enough to exceed threshold
+                profit_target=0.03,
+                risk_reward_ratio=2.0,
+                short_allowed=True,
+                action_masking=True,
+            )
+            self.assertGreater(br.exit_component, 0.0)
+            emitted = [warn for warn in w if issubclass(warn.category, RuntimeWarning)]
+            self.assertTrue(
+                len(emitted) >= 1, "Expected a RuntimeWarning for exit factor threshold"
+            )
+            # Confirm message indicates threshold exceedance (supports legacy and new message format)
+            self.assertTrue(
+                any(
+                    (
+                        "exceeded threshold" in str(warn.message)
+                        or "exceeds threshold" in str(warn.message)
+                        or "|factor|=" in str(warn.message)
+                    )
+                    for warn in emitted
+                ),
+                "Warning message should indicate threshold exceedance",
+            )
+
+    def test_public_wrapper_compute_exit_factor(self):
+        """Basic sanity check of newly exposed compute_exit_factor wrapper."""
+        from reward_space_analysis import compute_exit_factor
+
+        params = self.DEFAULT_PARAMS.copy()
+        params["exit_factor_mode"] = "sqrt"
+        f1 = compute_exit_factor(100.0, 0.02, 1.0, 0.0, params)
+        f2 = compute_exit_factor(100.0, 0.02, 1.0, 1.0, params)
+        self.assertGreater(
+            f1, f2, "Attenuation should reduce factor at higher duration ratio"
+        )
+
 
 if __name__ == "__main__":
     # Configure test discovery and execution