From f29939717f69b7b04758fca7139368035b950176 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 6 Oct 2025 14:37:55 +0200 Subject: [PATCH] test(reward): add parameter validation, legacy step, piecewise slope=0, non-negative exit factor tests --- .../test_reward_space_analysis.py | 858 +++++++++++++++++- 1 file changed, 832 insertions(+), 26 deletions(-) diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index 8ffa10b..5e61c74 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -7,7 +7,9 @@ Covers: - Reward alignment with environment """ +import dataclasses import json +import math import pickle import shutil import subprocess @@ -60,8 +62,26 @@ class RewardSpaceTestBase(unittest.TestCase): """Clean up temporary files.""" shutil.rmtree(self.temp_dir, ignore_errors=True) - def assertAlmostEqualFloat(self, first, second, tolerance=1e-6, msg=None): - """Helper for floating point comparisons.""" + def assertAlmostEqualFloat( + self, + first: float, + second: float, + tolerance: float = 1e-6, + msg: str | None = None, + ) -> None: + """Helper for floating point comparisons with explicit type hints. + + Parameters + ---------- + first : float + First value to compare. + second : float + Second value to compare. + tolerance : float, default 1e-6 + Absolute tolerance allowed between the two values. + msg : str | None + Optional message to display on failure. + """ if abs(first - second) > tolerance: self.fail(f"{first} != {second} within {tolerance}: {msg}") @@ -318,58 +338,298 @@ class TestRewardAlignment(RewardSpaceTestBase): ) def test_force_action_logic(self): - """Test force action behavior consistency.""" - # Take profit should generally be positive + """Validate forced exits (take profit, stop loss, timeout) produce consistent exit rewards. + + Algorithmic expectations: + - ForceActions override the provided action and trigger exit reward path. + - Exit reward sign should match PnL sign (exit_factor is positive under invariants). + - Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|. + - Timeout uses current PnL (can be positive or negative); we assert sign consistency only. + """ + base_factor = 100.0 + profit_target = 0.06 + + # Take profit (positive pnl) tp_context = RewardContext( - pnl=0.05, # Good profit - trade_duration=20, + pnl=0.05, + trade_duration=50, idle_duration=0, max_trade_duration=100, - max_unrealized_profit=0.06, - min_unrealized_profit=0.04, + max_unrealized_profit=0.07, + min_unrealized_profit=0.01, position=Positions.Long, - action=Actions.Long_exit, + action=Actions.Neutral, # action ignored due to force_action force_action=ForceActions.Take_profit, ) - tp_breakdown = calculate_reward( tp_context, self.DEFAULT_PARAMS, - base_factor=100.0, - profit_target=0.06, + base_factor=base_factor, + profit_target=profit_target, risk_reward_ratio=2.0, short_allowed=True, action_masking=True, ) + self.assertGreater( + tp_breakdown.exit_component, + 0.0, + "Take profit should yield positive exit reward", + ) + # Exit reward should be the only active component + self.assertEqual(tp_breakdown.invalid_penalty, 0.0) + self.assertEqual(tp_breakdown.idle_penalty, 0.0) + self.assertEqual(tp_breakdown.holding_penalty, 0.0) + self.assertEqual(tp_breakdown.total, tp_breakdown.exit_component) + self.assertAlmostEqualFloat( + math.copysign(1, tp_breakdown.exit_component), + 1.0, + msg="TP reward sign mismatch", + ) - # Stop loss should generally be negative + # Stop loss (negative pnl) sl_context = RewardContext( - pnl=-0.03, # Loss - trade_duration=15, + pnl=-0.03, + trade_duration=50, idle_duration=0, max_trade_duration=100, max_unrealized_profit=0.01, - min_unrealized_profit=-0.04, + min_unrealized_profit=-0.05, position=Positions.Long, - action=Actions.Long_exit, + action=Actions.Neutral, force_action=ForceActions.Stop_loss, ) - sl_breakdown = calculate_reward( sl_context, self.DEFAULT_PARAMS, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=2.0, + short_allowed=True, + action_masking=True, + ) + self.assertLess( + sl_breakdown.exit_component, + 0.0, + "Stop loss should yield negative exit reward", + ) + self.assertEqual(sl_breakdown.invalid_penalty, 0.0) + self.assertEqual(sl_breakdown.idle_penalty, 0.0) + self.assertEqual(sl_breakdown.holding_penalty, 0.0) + self.assertEqual(sl_breakdown.total, sl_breakdown.exit_component) + self.assertAlmostEqualFloat( + math.copysign(1, sl_breakdown.exit_component), + -1.0, + msg="SL reward sign mismatch", + ) + + # Timeout (use small positive pnl) + to_context = RewardContext( + pnl=0.01, + trade_duration=120, # beyond default max + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.02, + min_unrealized_profit=-0.01, + position=Positions.Long, + action=Actions.Neutral, + force_action=ForceActions.Timeout, + ) + to_breakdown = calculate_reward( + to_context, + self.DEFAULT_PARAMS, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=2.0, + short_allowed=True, + action_masking=True, + ) + self.assertGreaterEqual( + to_breakdown.exit_component, + 0.0, + "Timeout reward should be non-negative with positive PnL", + ) + self.assertEqual(to_breakdown.invalid_penalty, 0.0) + self.assertEqual(to_breakdown.idle_penalty, 0.0) + self.assertEqual(to_breakdown.holding_penalty, 0.0) + self.assertEqual(to_breakdown.total, to_breakdown.exit_component) + + # Magnitude ordering: TP reward magnitude > SL reward magnitude (absolute values, given larger |pnl| for TP) + self.assertGreater( + abs(tp_breakdown.exit_component), + abs(sl_breakdown.exit_component), + "Take profit reward magnitude should exceed stop loss reward magnitude", + ) + + def test_max_idle_duration_candles_logic(self): + """Idle penalty scaling test with explicit max_idle_duration_candles (no force-action comparisons).""" + params_small = self.DEFAULT_PARAMS.copy() + params_large = self.DEFAULT_PARAMS.copy() + # Activate explicit max idle durations + params_small["max_idle_duration_candles"] = 50 + params_large["max_idle_duration_candles"] = 200 + + base_factor = 100.0 + idle_duration = 40 # below large threshold, near small threshold + context = RewardContext( + pnl=0.0, + trade_duration=0, + idle_duration=idle_duration, + max_trade_duration=128, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Neutral, + action=Actions.Neutral, + force_action=None, + ) + + breakdown_small = calculate_reward( + context, + params_small, + base_factor, + profit_target=0.03, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + breakdown_large = calculate_reward( + context, + params_large, + base_factor, + profit_target=0.03, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + + self.assertLess(breakdown_small.idle_penalty, 0.0) + self.assertLess(breakdown_large.idle_penalty, 0.0) + # Because denominator is larger, absolute penalty (negative) should be smaller in magnitude + self.assertGreater( + breakdown_large.idle_penalty, + breakdown_small.idle_penalty, + f"Expected less severe penalty with larger max_idle_duration_candles (large={breakdown_large.idle_penalty}, small={breakdown_small.idle_penalty})", + ) + + def test_idle_penalty_fallback_and_proportionality(self): + """When max_idle_duration_candles <= 0, fallback to max_trade_duration and ensure proportional scaling. + + Also validates that penalty doubles (approximately) when idle_duration doubles (holding other params constant). + """ + params = self.DEFAULT_PARAMS.copy() + params["max_idle_duration_candles"] = 0 # force fallback + base_factor = 90.0 + profit_target = 0.03 + risk_reward_ratio = 1.0 + + # Two contexts with different idle durations + ctx_a = RewardContext( + pnl=0.0, + trade_duration=0, + idle_duration=20, + max_trade_duration=100, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Neutral, + action=Actions.Neutral, + force_action=None, + ) + ctx_b = dataclasses.replace(ctx_a, idle_duration=40) + + br_a = calculate_reward( + ctx_a, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=risk_reward_ratio, + short_allowed=True, + action_masking=True, + ) + br_b = calculate_reward( + ctx_b, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=risk_reward_ratio, + short_allowed=True, + action_masking=True, + ) + + self.assertLess(br_a.idle_penalty, 0.0) + self.assertLess(br_b.idle_penalty, 0.0) + + # Ratio of penalties should be close to 2 (linear power=1 scaling) allowing small numerical tolerance + ratio = ( + br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None + ) + self.assertIsNotNone(ratio, "Idle penalty A should not be zero") + # Both penalties are negative, ratio should be ~ (40/20) = 2, hence ratio ~ 2 + self.assertAlmostEqualFloat( + abs(ratio), + 2.0, + tolerance=0.2, + msg=f"Idle penalty proportionality mismatch (ratio={ratio})", + ) + + def test_exit_factor_threshold_warning_non_capping(self): + """Ensure exit_factor_threshold does not cap the exit factor (warning-only semantics). + + We approximate by computing two rewards: a baseline and an amplified one (via larger base_factor & pnl) and + ensure the amplified reward scales proportionally beyond the threshold rather than flattening. + """ + params = self.DEFAULT_PARAMS.copy() + # Remove base_factor from params so that the function uses the provided argument (makes scaling observable) + params.pop("base_factor", None) + threshold = float(params.get("exit_factor_threshold", 10_000.0)) + + context = RewardContext( + pnl=0.08, # above typical profit_target * RR + trade_duration=10, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.09, + min_unrealized_profit=0.0, + position=Positions.Long, + action=Actions.Long_exit, + force_action=None, + ) + + # Baseline with moderate base_factor + baseline = calculate_reward( + context, + params, base_factor=100.0, - profit_target=0.06, + profit_target=0.03, risk_reward_ratio=2.0, short_allowed=True, action_masking=True, ) - # Take profit should be better than stop loss + # Amplified: choose a much larger base_factor (ensure > threshold relative scale) + amplified_base_factor = max( + 100.0 * 50, threshold * 2.0 / max(context.pnl, 1e-9) + ) + amplified = calculate_reward( + context, + params, + base_factor=amplified_base_factor, + profit_target=0.03, + risk_reward_ratio=2.0, + short_allowed=True, + action_masking=True, + ) + + scale_observed = ( + amplified.exit_component / baseline.exit_component + if baseline.exit_component + else 0.0 + ) + self.assertGreater(baseline.exit_component, 0.0) + self.assertGreater(amplified.exit_component, baseline.exit_component) + # Expect at least ~10x increase (safe margin) to confirm absence of clipping self.assertGreater( - tp_breakdown.total, - sl_breakdown.total, - "Take profit should yield higher reward than stop loss", + scale_observed, + 10.0, + f"Amplified reward did not scale sufficiently (factor={scale_observed:.2f}, amplified_base_factor={amplified_base_factor})", ) def test_exit_factor_calculation(self): @@ -667,7 +927,6 @@ class TestStatisticalValidation(RewardSpaceTestBase): def test_exit_factor_mathematical_formulas(self): """Test mathematical correctness of exit factor calculations.""" - import math from reward_space_analysis import ( Actions, @@ -701,8 +960,7 @@ class TestStatisticalValidation(RewardSpaceTestBase): ) # Mathematical validation: alpha = -ln(tau) = -ln(0.5) ≈ 0.693 - expected_alpha = -math.log(0.5) - expected_attenuation = 1.0 / (1.0 + expected_alpha * duration_ratio) + # Analytical alpha value (unused directly now, kept for clarity): -ln(0.5) ≈ 0.693 # The reward should be attenuated by this factor self.assertGreater( @@ -1031,6 +1289,55 @@ class TestBoundaryConditions(RewardSpaceTestBase): f"Total reward should be finite for mode {mode}", ) + def test_unknown_exit_factor_mode_fallback_piecewise(self): + """Unknown exit_factor_mode must fallback to piecewise attenuation.""" + base_factor = 150.0 + context = RewardContext( + pnl=0.05, + trade_duration=160, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.07, + min_unrealized_profit=0.0, + position=Positions.Long, + action=Actions.Long_exit, + force_action=None, + ) + params_piecewise = self.DEFAULT_PARAMS.copy() + params_piecewise["exit_factor_mode"] = "piecewise" + params_unknown = self.DEFAULT_PARAMS.copy() + params_unknown["exit_factor_mode"] = "unrecognized_mode_xyz" + + reward_piecewise = calculate_reward( + context, + params_piecewise, + base_factor=base_factor, + profit_target=0.03, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + reward_unknown = calculate_reward( + context, + params_unknown, + base_factor=base_factor, + profit_target=0.03, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + self.assertGreater( + reward_piecewise.exit_component, + 0.0, + "Piecewise exit reward should be positive with positive pnl", + ) + self.assertAlmostEqualFloat( + reward_piecewise.exit_component, + reward_unknown.exit_component, + tolerance=1e-9, + msg="Fallback for unknown mode should produce identical result to piecewise", + ) + class TestHelperFunctions(RewardSpaceTestBase): """Test utility and helper functions.""" @@ -1545,6 +1852,505 @@ class TestPrivateFunctions(RewardSpaceTestBase): f"Penalty should increase with duration: {penalties[i]} > {penalties[i-1]}", ) + def test_new_invariant_and_warn_parameters(self): + """Ensure new tunables (check_invariants, exit_factor_threshold) exist and behave. + + Uses a very large base_factor to trigger potential warning condition without capping. + """ + params = self.DEFAULT_PARAMS.copy() + self.assertIn("check_invariants", params) + self.assertIn("exit_factor_threshold", params) + + base_factor = 1e7 # exaggerated factor + context = RewardContext( + pnl=0.05, + trade_duration=300, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.06, + min_unrealized_profit=0.0, + position=Positions.Long, + action=Actions.Long_exit, + force_action=None, + ) + breakdown = calculate_reward( + context, + params, + base_factor=base_factor, + profit_target=0.03, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + self.assertTrue( + np.isfinite(breakdown.exit_component), "Exit component must be finite" + ) + + +class TestRewardRobustness(RewardSpaceTestBase): + """Tests implementing all prioritized robustness enhancements. + + Covers: + - Reward decomposition integrity (total == sum of active component exactly) + - Exit factor monotonic attenuation per mode where mathematically expected + - Boundary parameter conditions (tau extremes, piecewise grace edges, linear slope = 0) + - Non-linear power tests for idle & holding penalties (power != 1) + - Public wrapper `compute_exit_factor` (avoids private function usage in new tests) + - Warning emission (exit_factor_threshold) without capping + """ + + def _mk_context( + self, + pnl: float = 0.04, + trade_duration: int = 40, + idle_duration: int = 0, + max_trade_duration: int = 100, + max_unrealized_profit: float = 0.05, + min_unrealized_profit: float = 0.01, + position: Positions = Positions.Long, + action: Actions = Actions.Long_exit, + force_action: ForceActions | None = None, + ) -> RewardContext: + return RewardContext( + pnl=pnl, + trade_duration=trade_duration, + idle_duration=idle_duration, + max_trade_duration=max_trade_duration, + max_unrealized_profit=max_unrealized_profit, + min_unrealized_profit=min_unrealized_profit, + position=position, + action=action, + force_action=force_action, + ) + + def test_decomposition_integrity(self): + """Assert reward_total equals exactly the single active component (mutual exclusivity). + + We sample a grid of mutually exclusive scenarios and validate decomposition. + """ + scenarios = [ + # Idle penalty only + dict( + ctx=RewardContext( + pnl=0.0, + trade_duration=0, + idle_duration=25, + max_trade_duration=100, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Neutral, + action=Actions.Neutral, + force_action=None, + ), + active="idle_penalty", + ), + # Holding penalty only + dict( + ctx=RewardContext( + pnl=0.0, + trade_duration=150, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Long, + action=Actions.Neutral, + force_action=None, + ), + active="holding_penalty", + ), + # Exit reward only (positive pnl) + dict( + ctx=self._mk_context(pnl=0.03, trade_duration=60), + active="exit_component", + ), + # Invalid action only + dict( + ctx=RewardContext( + pnl=0.01, + trade_duration=10, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.02, + min_unrealized_profit=0.0, + position=Positions.Short, + action=Actions.Long_exit, # invalid + force_action=None, + ), + active="invalid_penalty", + ), + ] + for sc in scenarios: # type: ignore[var-annotated] + ctx_obj: RewardContext = sc["ctx"] # type: ignore[index] + active_label: str = sc["active"] # type: ignore[index] + with self.subTest(active=active_label): + br = calculate_reward( + ctx_obj, + self.DEFAULT_PARAMS, + base_factor=100.0, + profit_target=0.03, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=(active_label != "invalid_penalty"), + ) + components = [ + br.invalid_penalty, + br.idle_penalty, + br.holding_penalty, + br.exit_component, + ] + non_zero = [ + c for c in components if not math.isclose(c, 0.0, abs_tol=1e-12) + ] + self.assertEqual( + len(non_zero), + 1, + f"Exactly one component must be active for {sc['active']}", + ) + self.assertAlmostEqualFloat( + br.total, + non_zero[0], + tolerance=1e-9, + msg=f"Total mismatch for {sc['active']}", + ) + + def test_exit_factor_monotonic_attenuation(self): + """For attenuation modes: factor should be non-increasing w.r.t duration_ratio. + + Modes covered: sqrt, linear, power, half_life, piecewise (after grace). + Legacy is excluded (non-monotonic by design: step change). Piecewise includes flat grace then monotonic. + """ + from reward_space_analysis import compute_exit_factor + + modes = ["sqrt", "linear", "power", "half_life", "piecewise"] + base_factor = 100.0 + pnl = 0.05 + pnl_factor = 1.0 + for mode in modes: + params = self.DEFAULT_PARAMS.copy() + params["exit_factor_mode"] = mode + if mode == "linear": + params["exit_linear_slope"] = 1.2 + if mode == "piecewise": + params["exit_piecewise_grace"] = 0.2 + params["exit_piecewise_slope"] = 1.0 + if mode == "power": + params["exit_power_tau"] = 0.5 + if mode == "half_life": + params["exit_half_life"] = 0.7 + + ratios = np.linspace(0, 2, 15) + values = [ + compute_exit_factor(base_factor, pnl, pnl_factor, r, params) + for r in ratios + ] + # Piecewise: ignore initial flat region when checking monotonic decrease + if mode == "piecewise": + grace = float(params["exit_piecewise_grace"]) # type: ignore[index] + filtered = [(r, v) for r, v in zip(ratios, values) if r >= grace - 1e-9] + values_to_check = [v for _, v in filtered] + else: + values_to_check = values + for earlier, later in zip(values_to_check, values_to_check[1:]): + self.assertLessEqual( + later, + earlier + 1e-9, + f"Non-monotonic attenuation in mode={mode}", + ) + + def test_exit_factor_boundary_parameters(self): + """Test parameter edge cases: tau extremes, grace edges, slope zero.""" + from reward_space_analysis import compute_exit_factor + + base_factor = 50.0 + pnl = 0.02 + pnl_factor = 1.0 + # Tau near 1 (minimal attenuation) vs tau near 0 (strong attenuation) + params_hi = self.DEFAULT_PARAMS.copy() + params_hi.update({"exit_factor_mode": "power", "exit_power_tau": 0.999999}) + params_lo = self.DEFAULT_PARAMS.copy() + params_lo.update({"exit_factor_mode": "power", "exit_power_tau": 1e-6}) + r = 1.5 + hi_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_hi) + lo_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_lo) + self.assertGreater( + hi_val, + lo_val, + "Power mode: higher tau (≈1) should attenuate less than tiny tau", + ) + # Piecewise grace 0 vs 1 + params_g0 = self.DEFAULT_PARAMS.copy() + params_g0.update( + { + "exit_factor_mode": "piecewise", + "exit_piecewise_grace": 0.0, + "exit_piecewise_slope": 1.0, + } + ) + params_g1 = self.DEFAULT_PARAMS.copy() + params_g1.update( + { + "exit_factor_mode": "piecewise", + "exit_piecewise_grace": 1.0, + "exit_piecewise_slope": 1.0, + } + ) + val_g0 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0) + val_g1 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1) + # With grace=1.0 no attenuation up to 1.0 ratio → value should be higher + self.assertGreater( + val_g1, + val_g0, + "Piecewise grace=1.0 should delay attenuation vs grace=0.0", + ) + # Linear slope zero vs positive + params_lin0 = self.DEFAULT_PARAMS.copy() + params_lin0.update({"exit_factor_mode": "linear", "exit_linear_slope": 0.0}) + params_lin1 = self.DEFAULT_PARAMS.copy() + params_lin1.update({"exit_factor_mode": "linear", "exit_linear_slope": 2.0}) + val_lin0 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0) + val_lin1 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1) + self.assertGreater( + val_lin0, + val_lin1, + "Linear slope=0 should yield no attenuation vs slope>0", + ) + + def test_piecewise_slope_zero_constant_after_grace(self): + """Piecewise slope=0 should yield flat factor after grace boundary.""" + from reward_space_analysis import compute_exit_factor + params = self.DEFAULT_PARAMS.copy() + params.update( + { + "exit_factor_mode": "piecewise", + "exit_piecewise_grace": 0.3, + "exit_piecewise_slope": 0.0, + } + ) + base_factor = 100.0 + pnl = 0.04 + pnl_factor = 1.2 + ratios = [0.3, 0.6, 1.0, 1.4] + values = [ + compute_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios + ] + # All factors should be (approximately) identical after grace (no attenuation) + first = values[0] + for v in values[1:]: + self.assertAlmostEqualFloat( + v, + first, + tolerance=1e-9, + msg=f"Piecewise slope=0 factor drift at ratio set {ratios} => {values}", + ) + + def test_legacy_step_non_monotonic(self): + """Legacy mode applies step change at duration_ratio=1 (should not be monotonic).""" + from reward_space_analysis import compute_exit_factor + params = self.DEFAULT_PARAMS.copy() + params["exit_factor_mode"] = "legacy" + base_factor = 100.0 + pnl = 0.02 + pnl_factor = 1.0 + # ratio below 1 vs above 1 + below = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params) + above = compute_exit_factor(base_factor, pnl, pnl_factor, 1.5, params) + # Legacy multiplies by 1.5 then 0.5 -> below should be > above * 2 (since (1.5)/(0.5)=3) + self.assertGreater( + below, above, "Legacy pre-threshold factor should exceed post-threshold" + ) + ratio = below / max(above, 1e-12) + self.assertGreater( + ratio, + 2.5, + f"Legacy step ratio unexpectedly small (expected ~3, got {ratio:.3f})", + ) + + def test_exit_factor_non_negative_with_positive_pnl(self): + """Exit factor must not be negative when pnl >= 0 (invariant clamp).""" + from reward_space_analysis import compute_exit_factor + params = self.DEFAULT_PARAMS.copy() + # Try multiple modes / extreme params + modes = ["linear", "power", "piecewise", "half_life", "sqrt", "legacy"] + base_factor = 100.0 + pnl = 0.05 + pnl_factor = 2.0 # amplified + for mode in modes: + params_mode = params.copy() + params_mode["exit_factor_mode"] = mode + val = compute_exit_factor(base_factor, pnl, pnl_factor, 2.0, params_mode) + self.assertGreaterEqual( + val, 0.0, f"Exit factor should be >=0 for non-negative pnl in mode {mode}" + ) + + +class TestParameterValidation(RewardSpaceTestBase): + """Tests for validate_reward_parameters adjustments and reasons.""" + + def test_validate_reward_parameters_adjustments(self): + from reward_space_analysis import validate_reward_parameters + + raw = self.DEFAULT_PARAMS.copy() + # Introduce out-of-bound values + raw["idle_penalty_scale"] = -5.0 # < min 0 + raw["efficiency_center"] = 2.0 # > max 1 + raw["exit_power_tau"] = 0.0 # below min (treated as 1e-6) + sanitized, adjustments = validate_reward_parameters(raw) + # Idle penalty scale should be clamped to 0.0 + self.assertEqual(sanitized["idle_penalty_scale"], 0.0) + self.assertIn("idle_penalty_scale", adjustments) + self.assertIsInstance(adjustments["idle_penalty_scale"]["reason"], str) + self.assertIn("min=0.0", str(adjustments["idle_penalty_scale"]["reason"])) + # Efficiency center should be clamped to 1.0 + self.assertEqual(sanitized["efficiency_center"], 1.0) + self.assertIn("efficiency_center", adjustments) + self.assertIn("max=1.0", str(adjustments["efficiency_center"]["reason"])) + # exit_power_tau should be raised to lower bound (>=1e-6) + self.assertGreaterEqual(float(sanitized["exit_power_tau"]), 1e-6) + self.assertIn("exit_power_tau", adjustments) + self.assertIn("min=", str(adjustments["exit_power_tau"]["reason"])) + + def test_idle_and_holding_penalty_power(self): + """Test non-linear scaling when penalty powers != 1.""" + params = self.DEFAULT_PARAMS.copy() + params["idle_penalty_power"] = 2.0 + params["max_idle_duration_candles"] = 100 + base_factor = 90.0 + profit_target = 0.03 + # Idle penalties for durations 20 vs 40 (quadratic → (40/100)^2 / (20/100)^2 = (0.4^2)/(0.2^2)=4) + ctx_a = RewardContext( + pnl=0.0, + trade_duration=0, + idle_duration=20, + max_trade_duration=128, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Neutral, + action=Actions.Neutral, + force_action=None, + ) + ctx_b = dataclasses.replace(ctx_a, idle_duration=40) + br_a = calculate_reward( + ctx_a, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + br_b = calculate_reward( + ctx_b, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + ratio_quadratic = 0.0 + if br_a.idle_penalty != 0: + ratio_quadratic = br_b.idle_penalty / br_a.idle_penalty + # Both negative; absolute ratio should be close to 4 + self.assertAlmostEqual( + abs(ratio_quadratic), + 4.0, + delta=0.8, + msg=f"Idle penalty quadratic scaling mismatch (ratio={ratio_quadratic})", + ) + # Holding penalty with power 2: durations just above threshold + params["holding_penalty_power"] = 2.0 + ctx_h1 = RewardContext( + pnl=0.0, + trade_duration=130, + idle_duration=0, + max_trade_duration=100, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Long, + action=Actions.Neutral, + force_action=None, + ) + ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140) + br_h1 = calculate_reward( + ctx_h1, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + br_h2 = calculate_reward( + ctx_h2, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + # Quadratic scaling: ((140-100)/(130-100))^2 = (40/30)^2 ≈ 1.777... + hold_ratio = 0.0 + if br_h1.holding_penalty != 0: + hold_ratio = br_h2.holding_penalty / br_h1.holding_penalty + self.assertAlmostEqual( + abs(hold_ratio), + (40 / 30) ** 2, + delta=0.4, + msg=f"Holding penalty quadratic scaling mismatch (ratio={hold_ratio})", + ) + + def test_exit_factor_threshold_warning_emission(self): + """Ensure a RuntimeWarning is emitted when exit_factor exceeds threshold (no capping).""" + import warnings as _warnings + + params = self.DEFAULT_PARAMS.copy() + params["exit_factor_threshold"] = 10.0 # low threshold to trigger easily + # Remove base_factor to allow argument override + params.pop("base_factor", None) + context = self._mk_context(pnl=0.06, trade_duration=10) + with _warnings.catch_warnings(record=True) as w: + _warnings.simplefilter("always") + br = calculate_reward( + context, + params, + base_factor=5000.0, # large enough to exceed threshold + profit_target=0.03, + risk_reward_ratio=2.0, + short_allowed=True, + action_masking=True, + ) + self.assertGreater(br.exit_component, 0.0) + emitted = [warn for warn in w if issubclass(warn.category, RuntimeWarning)] + self.assertTrue( + len(emitted) >= 1, "Expected a RuntimeWarning for exit factor threshold" + ) + # Confirm message indicates threshold exceedance (supports legacy and new message format) + self.assertTrue( + any( + ( + "exceeded threshold" in str(warn.message) + or "exceeds threshold" in str(warn.message) + or "|factor|=" in str(warn.message) + ) + for warn in emitted + ), + "Warning message should indicate threshold exceedance", + ) + + def test_public_wrapper_compute_exit_factor(self): + """Basic sanity check of newly exposed compute_exit_factor wrapper.""" + from reward_space_analysis import compute_exit_factor + + params = self.DEFAULT_PARAMS.copy() + params["exit_factor_mode"] = "sqrt" + f1 = compute_exit_factor(100.0, 0.02, 1.0, 0.0, params) + f2 = compute_exit_factor(100.0, 0.02, 1.0, 1.0, params) + self.assertGreater( + f1, f2, "Attenuation should reduce factor at higher duration ratio" + ) + if __name__ == "__main__": # Configure test discovery and execution -- 2.43.0