- Reward alignment with environment
"""
+import dataclasses
import json
+import math
import pickle
import shutil
import subprocess
"""Clean up temporary files."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
- def assertAlmostEqualFloat(self, first, second, tolerance=1e-6, msg=None):
- """Helper for floating point comparisons."""
+ def assertAlmostEqualFloat(
+ self,
+ first: float,
+ second: float,
+ tolerance: float = 1e-6,
+ msg: str | None = None,
+ ) -> None:
+ """Helper for floating point comparisons with explicit type hints.
+
+ Parameters
+ ----------
+ first : float
+ First value to compare.
+ second : float
+ Second value to compare.
+ tolerance : float, default 1e-6
+ Absolute tolerance allowed between the two values.
+ msg : str | None
+ Optional message to display on failure.
+ """
if abs(first - second) > tolerance:
self.fail(f"{first} != {second} within {tolerance}: {msg}")
)
def test_force_action_logic(self):
- """Test force action behavior consistency."""
- # Take profit should generally be positive
+ """Validate forced exits (take profit, stop loss, timeout) produce consistent exit rewards.
+
+ Algorithmic expectations:
+ - ForceActions override the provided action and trigger exit reward path.
+ - Exit reward sign should match PnL sign (exit_factor is positive under invariants).
+ - Take profit reward magnitude > stop loss reward magnitude for comparable |PnL|.
+ - Timeout uses current PnL (can be positive or negative); we assert sign consistency only.
+ """
+ base_factor = 100.0
+ profit_target = 0.06
+
+ # Take profit (positive pnl)
tp_context = RewardContext(
- pnl=0.05, # Good profit
- trade_duration=20,
+ pnl=0.05,
+ trade_duration=50,
idle_duration=0,
max_trade_duration=100,
- max_unrealized_profit=0.06,
- min_unrealized_profit=0.04,
+ max_unrealized_profit=0.07,
+ min_unrealized_profit=0.01,
position=Positions.Long,
- action=Actions.Long_exit,
+ action=Actions.Neutral, # action ignored due to force_action
force_action=ForceActions.Take_profit,
)
-
tp_breakdown = calculate_reward(
tp_context,
self.DEFAULT_PARAMS,
- base_factor=100.0,
- profit_target=0.06,
+ base_factor=base_factor,
+ profit_target=profit_target,
risk_reward_ratio=2.0,
short_allowed=True,
action_masking=True,
)
+ self.assertGreater(
+ tp_breakdown.exit_component,
+ 0.0,
+ "Take profit should yield positive exit reward",
+ )
+ # Exit reward should be the only active component
+ self.assertEqual(tp_breakdown.invalid_penalty, 0.0)
+ self.assertEqual(tp_breakdown.idle_penalty, 0.0)
+ self.assertEqual(tp_breakdown.holding_penalty, 0.0)
+ self.assertEqual(tp_breakdown.total, tp_breakdown.exit_component)
+ self.assertAlmostEqualFloat(
+ math.copysign(1, tp_breakdown.exit_component),
+ 1.0,
+ msg="TP reward sign mismatch",
+ )
- # Stop loss should generally be negative
+ # Stop loss (negative pnl)
sl_context = RewardContext(
- pnl=-0.03, # Loss
- trade_duration=15,
+ pnl=-0.03,
+ trade_duration=50,
idle_duration=0,
max_trade_duration=100,
max_unrealized_profit=0.01,
- min_unrealized_profit=-0.04,
+ min_unrealized_profit=-0.05,
position=Positions.Long,
- action=Actions.Long_exit,
+ action=Actions.Neutral,
force_action=ForceActions.Stop_loss,
)
-
sl_breakdown = calculate_reward(
sl_context,
self.DEFAULT_PARAMS,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=2.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertLess(
+ sl_breakdown.exit_component,
+ 0.0,
+ "Stop loss should yield negative exit reward",
+ )
+ self.assertEqual(sl_breakdown.invalid_penalty, 0.0)
+ self.assertEqual(sl_breakdown.idle_penalty, 0.0)
+ self.assertEqual(sl_breakdown.holding_penalty, 0.0)
+ self.assertEqual(sl_breakdown.total, sl_breakdown.exit_component)
+ self.assertAlmostEqualFloat(
+ math.copysign(1, sl_breakdown.exit_component),
+ -1.0,
+ msg="SL reward sign mismatch",
+ )
+
+ # Timeout (use small positive pnl)
+ to_context = RewardContext(
+ pnl=0.01,
+ trade_duration=120, # beyond default max
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.02,
+ min_unrealized_profit=-0.01,
+ position=Positions.Long,
+ action=Actions.Neutral,
+ force_action=ForceActions.Timeout,
+ )
+ to_breakdown = calculate_reward(
+ to_context,
+ self.DEFAULT_PARAMS,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=2.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertGreaterEqual(
+ to_breakdown.exit_component,
+ 0.0,
+ "Timeout reward should be non-negative with positive PnL",
+ )
+ self.assertEqual(to_breakdown.invalid_penalty, 0.0)
+ self.assertEqual(to_breakdown.idle_penalty, 0.0)
+ self.assertEqual(to_breakdown.holding_penalty, 0.0)
+ self.assertEqual(to_breakdown.total, to_breakdown.exit_component)
+
+ # Magnitude ordering: TP reward magnitude > SL reward magnitude (absolute values, given larger |pnl| for TP)
+ self.assertGreater(
+ abs(tp_breakdown.exit_component),
+ abs(sl_breakdown.exit_component),
+ "Take profit reward magnitude should exceed stop loss reward magnitude",
+ )
+
+ def test_max_idle_duration_candles_logic(self):
+ """Idle penalty scaling test with explicit max_idle_duration_candles (no force-action comparisons)."""
+ params_small = self.DEFAULT_PARAMS.copy()
+ params_large = self.DEFAULT_PARAMS.copy()
+ # Activate explicit max idle durations
+ params_small["max_idle_duration_candles"] = 50
+ params_large["max_idle_duration_candles"] = 200
+
+ base_factor = 100.0
+ idle_duration = 40 # below large threshold, near small threshold
+ context = RewardContext(
+ pnl=0.0,
+ trade_duration=0,
+ idle_duration=idle_duration,
+ max_trade_duration=128,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Neutral,
+ action=Actions.Neutral,
+ force_action=None,
+ )
+
+ breakdown_small = calculate_reward(
+ context,
+ params_small,
+ base_factor,
+ profit_target=0.03,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ breakdown_large = calculate_reward(
+ context,
+ params_large,
+ base_factor,
+ profit_target=0.03,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+
+ self.assertLess(breakdown_small.idle_penalty, 0.0)
+ self.assertLess(breakdown_large.idle_penalty, 0.0)
+ # Because denominator is larger, absolute penalty (negative) should be smaller in magnitude
+ self.assertGreater(
+ breakdown_large.idle_penalty,
+ breakdown_small.idle_penalty,
+ f"Expected less severe penalty with larger max_idle_duration_candles (large={breakdown_large.idle_penalty}, small={breakdown_small.idle_penalty})",
+ )
+
+ def test_idle_penalty_fallback_and_proportionality(self):
+ """When max_idle_duration_candles <= 0, fallback to max_trade_duration and ensure proportional scaling.
+
+ Also validates that penalty doubles (approximately) when idle_duration doubles (holding other params constant).
+ """
+ params = self.DEFAULT_PARAMS.copy()
+ params["max_idle_duration_candles"] = 0 # force fallback
+ base_factor = 90.0
+ profit_target = 0.03
+ risk_reward_ratio = 1.0
+
+ # Two contexts with different idle durations
+ ctx_a = RewardContext(
+ pnl=0.0,
+ trade_duration=0,
+ idle_duration=20,
+ max_trade_duration=100,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Neutral,
+ action=Actions.Neutral,
+ force_action=None,
+ )
+ ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
+
+ br_a = calculate_reward(
+ ctx_a,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=risk_reward_ratio,
+ short_allowed=True,
+ action_masking=True,
+ )
+ br_b = calculate_reward(
+ ctx_b,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=risk_reward_ratio,
+ short_allowed=True,
+ action_masking=True,
+ )
+
+ self.assertLess(br_a.idle_penalty, 0.0)
+ self.assertLess(br_b.idle_penalty, 0.0)
+
+ # Ratio of penalties should be close to 2 (linear power=1 scaling) allowing small numerical tolerance
+ ratio = (
+ br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None
+ )
+ self.assertIsNotNone(ratio, "Idle penalty A should not be zero")
+ # Both penalties are negative, ratio should be ~ (40/20) = 2, hence ratio ~ 2
+ self.assertAlmostEqualFloat(
+ abs(ratio),
+ 2.0,
+ tolerance=0.2,
+ msg=f"Idle penalty proportionality mismatch (ratio={ratio})",
+ )
+
+ def test_exit_factor_threshold_warning_non_capping(self):
+ """Ensure exit_factor_threshold does not cap the exit factor (warning-only semantics).
+
+ We approximate by computing two rewards: a baseline and an amplified one (via larger base_factor & pnl) and
+ ensure the amplified reward scales proportionally beyond the threshold rather than flattening.
+ """
+ params = self.DEFAULT_PARAMS.copy()
+ # Remove base_factor from params so that the function uses the provided argument (makes scaling observable)
+ params.pop("base_factor", None)
+ threshold = float(params.get("exit_factor_threshold", 10_000.0))
+
+ context = RewardContext(
+ pnl=0.08, # above typical profit_target * RR
+ trade_duration=10,
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.09,
+ min_unrealized_profit=0.0,
+ position=Positions.Long,
+ action=Actions.Long_exit,
+ force_action=None,
+ )
+
+ # Baseline with moderate base_factor
+ baseline = calculate_reward(
+ context,
+ params,
base_factor=100.0,
- profit_target=0.06,
+ profit_target=0.03,
risk_reward_ratio=2.0,
short_allowed=True,
action_masking=True,
)
- # Take profit should be better than stop loss
+ # Amplified: choose a much larger base_factor (ensure > threshold relative scale)
+ amplified_base_factor = max(
+ 100.0 * 50, threshold * 2.0 / max(context.pnl, 1e-9)
+ )
+ amplified = calculate_reward(
+ context,
+ params,
+ base_factor=amplified_base_factor,
+ profit_target=0.03,
+ risk_reward_ratio=2.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+
+ scale_observed = (
+ amplified.exit_component / baseline.exit_component
+ if baseline.exit_component
+ else 0.0
+ )
+ self.assertGreater(baseline.exit_component, 0.0)
+ self.assertGreater(amplified.exit_component, baseline.exit_component)
+ # Expect at least ~10x increase (safe margin) to confirm absence of clipping
self.assertGreater(
- tp_breakdown.total,
- sl_breakdown.total,
- "Take profit should yield higher reward than stop loss",
+ scale_observed,
+ 10.0,
+ f"Amplified reward did not scale sufficiently (factor={scale_observed:.2f}, amplified_base_factor={amplified_base_factor})",
)
def test_exit_factor_calculation(self):
def test_exit_factor_mathematical_formulas(self):
"""Test mathematical correctness of exit factor calculations."""
- import math
from reward_space_analysis import (
Actions,
)
# Mathematical validation: alpha = -ln(tau) = -ln(0.5) ≈ 0.693
- expected_alpha = -math.log(0.5)
- expected_attenuation = 1.0 / (1.0 + expected_alpha * duration_ratio)
+ # Analytical alpha value (unused directly now, kept for clarity): -ln(0.5) ≈ 0.693
# The reward should be attenuated by this factor
self.assertGreater(
f"Total reward should be finite for mode {mode}",
)
+ def test_unknown_exit_factor_mode_fallback_piecewise(self):
+ """Unknown exit_factor_mode must fallback to piecewise attenuation."""
+ base_factor = 150.0
+ context = RewardContext(
+ pnl=0.05,
+ trade_duration=160,
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.07,
+ min_unrealized_profit=0.0,
+ position=Positions.Long,
+ action=Actions.Long_exit,
+ force_action=None,
+ )
+ params_piecewise = self.DEFAULT_PARAMS.copy()
+ params_piecewise["exit_factor_mode"] = "piecewise"
+ params_unknown = self.DEFAULT_PARAMS.copy()
+ params_unknown["exit_factor_mode"] = "unrecognized_mode_xyz"
+
+ reward_piecewise = calculate_reward(
+ context,
+ params_piecewise,
+ base_factor=base_factor,
+ profit_target=0.03,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ reward_unknown = calculate_reward(
+ context,
+ params_unknown,
+ base_factor=base_factor,
+ profit_target=0.03,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertGreater(
+ reward_piecewise.exit_component,
+ 0.0,
+ "Piecewise exit reward should be positive with positive pnl",
+ )
+ self.assertAlmostEqualFloat(
+ reward_piecewise.exit_component,
+ reward_unknown.exit_component,
+ tolerance=1e-9,
+ msg="Fallback for unknown mode should produce identical result to piecewise",
+ )
+
class TestHelperFunctions(RewardSpaceTestBase):
"""Test utility and helper functions."""
f"Penalty should increase with duration: {penalties[i]} > {penalties[i-1]}",
)
+ def test_new_invariant_and_warn_parameters(self):
+ """Ensure new tunables (check_invariants, exit_factor_threshold) exist and behave.
+
+ Uses a very large base_factor to trigger potential warning condition without capping.
+ """
+ params = self.DEFAULT_PARAMS.copy()
+ self.assertIn("check_invariants", params)
+ self.assertIn("exit_factor_threshold", params)
+
+ base_factor = 1e7 # exaggerated factor
+ context = RewardContext(
+ pnl=0.05,
+ trade_duration=300,
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.06,
+ min_unrealized_profit=0.0,
+ position=Positions.Long,
+ action=Actions.Long_exit,
+ force_action=None,
+ )
+ breakdown = calculate_reward(
+ context,
+ params,
+ base_factor=base_factor,
+ profit_target=0.03,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertTrue(
+ np.isfinite(breakdown.exit_component), "Exit component must be finite"
+ )
+
+
+class TestRewardRobustness(RewardSpaceTestBase):
+ """Tests implementing all prioritized robustness enhancements.
+
+ Covers:
+ - Reward decomposition integrity (total == sum of active component exactly)
+ - Exit factor monotonic attenuation per mode where mathematically expected
+ - Boundary parameter conditions (tau extremes, piecewise grace edges, linear slope = 0)
+ - Non-linear power tests for idle & holding penalties (power != 1)
+ - Public wrapper `compute_exit_factor` (avoids private function usage in new tests)
+ - Warning emission (exit_factor_threshold) without capping
+ """
+
+ def _mk_context(
+ self,
+ pnl: float = 0.04,
+ trade_duration: int = 40,
+ idle_duration: int = 0,
+ max_trade_duration: int = 100,
+ max_unrealized_profit: float = 0.05,
+ min_unrealized_profit: float = 0.01,
+ position: Positions = Positions.Long,
+ action: Actions = Actions.Long_exit,
+ force_action: ForceActions | None = None,
+ ) -> RewardContext:
+ return RewardContext(
+ pnl=pnl,
+ trade_duration=trade_duration,
+ idle_duration=idle_duration,
+ max_trade_duration=max_trade_duration,
+ max_unrealized_profit=max_unrealized_profit,
+ min_unrealized_profit=min_unrealized_profit,
+ position=position,
+ action=action,
+ force_action=force_action,
+ )
+
+ def test_decomposition_integrity(self):
+ """Assert reward_total equals exactly the single active component (mutual exclusivity).
+
+ We sample a grid of mutually exclusive scenarios and validate decomposition.
+ """
+ scenarios = [
+ # Idle penalty only
+ dict(
+ ctx=RewardContext(
+ pnl=0.0,
+ trade_duration=0,
+ idle_duration=25,
+ max_trade_duration=100,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Neutral,
+ action=Actions.Neutral,
+ force_action=None,
+ ),
+ active="idle_penalty",
+ ),
+ # Holding penalty only
+ dict(
+ ctx=RewardContext(
+ pnl=0.0,
+ trade_duration=150,
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Long,
+ action=Actions.Neutral,
+ force_action=None,
+ ),
+ active="holding_penalty",
+ ),
+ # Exit reward only (positive pnl)
+ dict(
+ ctx=self._mk_context(pnl=0.03, trade_duration=60),
+ active="exit_component",
+ ),
+ # Invalid action only
+ dict(
+ ctx=RewardContext(
+ pnl=0.01,
+ trade_duration=10,
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.02,
+ min_unrealized_profit=0.0,
+ position=Positions.Short,
+ action=Actions.Long_exit, # invalid
+ force_action=None,
+ ),
+ active="invalid_penalty",
+ ),
+ ]
+ for sc in scenarios: # type: ignore[var-annotated]
+ ctx_obj: RewardContext = sc["ctx"] # type: ignore[index]
+ active_label: str = sc["active"] # type: ignore[index]
+ with self.subTest(active=active_label):
+ br = calculate_reward(
+ ctx_obj,
+ self.DEFAULT_PARAMS,
+ base_factor=100.0,
+ profit_target=0.03,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=(active_label != "invalid_penalty"),
+ )
+ components = [
+ br.invalid_penalty,
+ br.idle_penalty,
+ br.holding_penalty,
+ br.exit_component,
+ ]
+ non_zero = [
+ c for c in components if not math.isclose(c, 0.0, abs_tol=1e-12)
+ ]
+ self.assertEqual(
+ len(non_zero),
+ 1,
+ f"Exactly one component must be active for {sc['active']}",
+ )
+ self.assertAlmostEqualFloat(
+ br.total,
+ non_zero[0],
+ tolerance=1e-9,
+ msg=f"Total mismatch for {sc['active']}",
+ )
+
+ def test_exit_factor_monotonic_attenuation(self):
+ """For attenuation modes: factor should be non-increasing w.r.t duration_ratio.
+
+ Modes covered: sqrt, linear, power, half_life, piecewise (after grace).
+ Legacy is excluded (non-monotonic by design: step change). Piecewise includes flat grace then monotonic.
+ """
+ from reward_space_analysis import compute_exit_factor
+
+ modes = ["sqrt", "linear", "power", "half_life", "piecewise"]
+ base_factor = 100.0
+ pnl = 0.05
+ pnl_factor = 1.0
+ for mode in modes:
+ params = self.DEFAULT_PARAMS.copy()
+ params["exit_factor_mode"] = mode
+ if mode == "linear":
+ params["exit_linear_slope"] = 1.2
+ if mode == "piecewise":
+ params["exit_piecewise_grace"] = 0.2
+ params["exit_piecewise_slope"] = 1.0
+ if mode == "power":
+ params["exit_power_tau"] = 0.5
+ if mode == "half_life":
+ params["exit_half_life"] = 0.7
+
+ ratios = np.linspace(0, 2, 15)
+ values = [
+ compute_exit_factor(base_factor, pnl, pnl_factor, r, params)
+ for r in ratios
+ ]
+ # Piecewise: ignore initial flat region when checking monotonic decrease
+ if mode == "piecewise":
+ grace = float(params["exit_piecewise_grace"]) # type: ignore[index]
+ filtered = [(r, v) for r, v in zip(ratios, values) if r >= grace - 1e-9]
+ values_to_check = [v for _, v in filtered]
+ else:
+ values_to_check = values
+ for earlier, later in zip(values_to_check, values_to_check[1:]):
+ self.assertLessEqual(
+ later,
+ earlier + 1e-9,
+ f"Non-monotonic attenuation in mode={mode}",
+ )
+
+ def test_exit_factor_boundary_parameters(self):
+ """Test parameter edge cases: tau extremes, grace edges, slope zero."""
+ from reward_space_analysis import compute_exit_factor
+
+ base_factor = 50.0
+ pnl = 0.02
+ pnl_factor = 1.0
+ # Tau near 1 (minimal attenuation) vs tau near 0 (strong attenuation)
+ params_hi = self.DEFAULT_PARAMS.copy()
+ params_hi.update({"exit_factor_mode": "power", "exit_power_tau": 0.999999})
+ params_lo = self.DEFAULT_PARAMS.copy()
+ params_lo.update({"exit_factor_mode": "power", "exit_power_tau": 1e-6})
+ r = 1.5
+ hi_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_hi)
+ lo_val = compute_exit_factor(base_factor, pnl, pnl_factor, r, params_lo)
+ self.assertGreater(
+ hi_val,
+ lo_val,
+ "Power mode: higher tau (≈1) should attenuate less than tiny tau",
+ )
+ # Piecewise grace 0 vs 1
+ params_g0 = self.DEFAULT_PARAMS.copy()
+ params_g0.update(
+ {
+ "exit_factor_mode": "piecewise",
+ "exit_piecewise_grace": 0.0,
+ "exit_piecewise_slope": 1.0,
+ }
+ )
+ params_g1 = self.DEFAULT_PARAMS.copy()
+ params_g1.update(
+ {
+ "exit_factor_mode": "piecewise",
+ "exit_piecewise_grace": 1.0,
+ "exit_piecewise_slope": 1.0,
+ }
+ )
+ val_g0 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0)
+ val_g1 = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1)
+ # With grace=1.0 no attenuation up to 1.0 ratio → value should be higher
+ self.assertGreater(
+ val_g1,
+ val_g0,
+ "Piecewise grace=1.0 should delay attenuation vs grace=0.0",
+ )
+ # Linear slope zero vs positive
+ params_lin0 = self.DEFAULT_PARAMS.copy()
+ params_lin0.update({"exit_factor_mode": "linear", "exit_linear_slope": 0.0})
+ params_lin1 = self.DEFAULT_PARAMS.copy()
+ params_lin1.update({"exit_factor_mode": "linear", "exit_linear_slope": 2.0})
+ val_lin0 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0)
+ val_lin1 = compute_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1)
+ self.assertGreater(
+ val_lin0,
+ val_lin1,
+ "Linear slope=0 should yield no attenuation vs slope>0",
+ )
+
+ def test_piecewise_slope_zero_constant_after_grace(self):
+ """Piecewise slope=0 should yield flat factor after grace boundary."""
+ from reward_space_analysis import compute_exit_factor
+ params = self.DEFAULT_PARAMS.copy()
+ params.update(
+ {
+ "exit_factor_mode": "piecewise",
+ "exit_piecewise_grace": 0.3,
+ "exit_piecewise_slope": 0.0,
+ }
+ )
+ base_factor = 100.0
+ pnl = 0.04
+ pnl_factor = 1.2
+ ratios = [0.3, 0.6, 1.0, 1.4]
+ values = [
+ compute_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios
+ ]
+ # All factors should be (approximately) identical after grace (no attenuation)
+ first = values[0]
+ for v in values[1:]:
+ self.assertAlmostEqualFloat(
+ v,
+ first,
+ tolerance=1e-9,
+ msg=f"Piecewise slope=0 factor drift at ratio set {ratios} => {values}",
+ )
+
+ def test_legacy_step_non_monotonic(self):
+ """Legacy mode applies step change at duration_ratio=1 (should not be monotonic)."""
+ from reward_space_analysis import compute_exit_factor
+ params = self.DEFAULT_PARAMS.copy()
+ params["exit_factor_mode"] = "legacy"
+ base_factor = 100.0
+ pnl = 0.02
+ pnl_factor = 1.0
+ # ratio below 1 vs above 1
+ below = compute_exit_factor(base_factor, pnl, pnl_factor, 0.5, params)
+ above = compute_exit_factor(base_factor, pnl, pnl_factor, 1.5, params)
+ # Legacy multiplies by 1.5 then 0.5 -> below should be > above * 2 (since (1.5)/(0.5)=3)
+ self.assertGreater(
+ below, above, "Legacy pre-threshold factor should exceed post-threshold"
+ )
+ ratio = below / max(above, 1e-12)
+ self.assertGreater(
+ ratio,
+ 2.5,
+ f"Legacy step ratio unexpectedly small (expected ~3, got {ratio:.3f})",
+ )
+
+ def test_exit_factor_non_negative_with_positive_pnl(self):
+ """Exit factor must not be negative when pnl >= 0 (invariant clamp)."""
+ from reward_space_analysis import compute_exit_factor
+ params = self.DEFAULT_PARAMS.copy()
+ # Try multiple modes / extreme params
+ modes = ["linear", "power", "piecewise", "half_life", "sqrt", "legacy"]
+ base_factor = 100.0
+ pnl = 0.05
+ pnl_factor = 2.0 # amplified
+ for mode in modes:
+ params_mode = params.copy()
+ params_mode["exit_factor_mode"] = mode
+ val = compute_exit_factor(base_factor, pnl, pnl_factor, 2.0, params_mode)
+ self.assertGreaterEqual(
+ val, 0.0, f"Exit factor should be >=0 for non-negative pnl in mode {mode}"
+ )
+
+
+class TestParameterValidation(RewardSpaceTestBase):
+ """Tests for validate_reward_parameters adjustments and reasons."""
+
+ def test_validate_reward_parameters_adjustments(self):
+ from reward_space_analysis import validate_reward_parameters
+
+ raw = self.DEFAULT_PARAMS.copy()
+ # Introduce out-of-bound values
+ raw["idle_penalty_scale"] = -5.0 # < min 0
+ raw["efficiency_center"] = 2.0 # > max 1
+ raw["exit_power_tau"] = 0.0 # below min (treated as 1e-6)
+ sanitized, adjustments = validate_reward_parameters(raw)
+ # Idle penalty scale should be clamped to 0.0
+ self.assertEqual(sanitized["idle_penalty_scale"], 0.0)
+ self.assertIn("idle_penalty_scale", adjustments)
+ self.assertIsInstance(adjustments["idle_penalty_scale"]["reason"], str)
+ self.assertIn("min=0.0", str(adjustments["idle_penalty_scale"]["reason"]))
+ # Efficiency center should be clamped to 1.0
+ self.assertEqual(sanitized["efficiency_center"], 1.0)
+ self.assertIn("efficiency_center", adjustments)
+ self.assertIn("max=1.0", str(adjustments["efficiency_center"]["reason"]))
+ # exit_power_tau should be raised to lower bound (>=1e-6)
+ self.assertGreaterEqual(float(sanitized["exit_power_tau"]), 1e-6)
+ self.assertIn("exit_power_tau", adjustments)
+ self.assertIn("min=", str(adjustments["exit_power_tau"]["reason"]))
+
+ def test_idle_and_holding_penalty_power(self):
+ """Test non-linear scaling when penalty powers != 1."""
+ params = self.DEFAULT_PARAMS.copy()
+ params["idle_penalty_power"] = 2.0
+ params["max_idle_duration_candles"] = 100
+ base_factor = 90.0
+ profit_target = 0.03
+ # Idle penalties for durations 20 vs 40 (quadratic → (40/100)^2 / (20/100)^2 = (0.4^2)/(0.2^2)=4)
+ ctx_a = RewardContext(
+ pnl=0.0,
+ trade_duration=0,
+ idle_duration=20,
+ max_trade_duration=128,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Neutral,
+ action=Actions.Neutral,
+ force_action=None,
+ )
+ ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
+ br_a = calculate_reward(
+ ctx_a,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ br_b = calculate_reward(
+ ctx_b,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ ratio_quadratic = 0.0
+ if br_a.idle_penalty != 0:
+ ratio_quadratic = br_b.idle_penalty / br_a.idle_penalty
+ # Both negative; absolute ratio should be close to 4
+ self.assertAlmostEqual(
+ abs(ratio_quadratic),
+ 4.0,
+ delta=0.8,
+ msg=f"Idle penalty quadratic scaling mismatch (ratio={ratio_quadratic})",
+ )
+ # Holding penalty with power 2: durations just above threshold
+ params["holding_penalty_power"] = 2.0
+ ctx_h1 = RewardContext(
+ pnl=0.0,
+ trade_duration=130,
+ idle_duration=0,
+ max_trade_duration=100,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Long,
+ action=Actions.Neutral,
+ force_action=None,
+ )
+ ctx_h2 = dataclasses.replace(ctx_h1, trade_duration=140)
+ br_h1 = calculate_reward(
+ ctx_h1,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ br_h2 = calculate_reward(
+ ctx_h2,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ # Quadratic scaling: ((140-100)/(130-100))^2 = (40/30)^2 ≈ 1.777...
+ hold_ratio = 0.0
+ if br_h1.holding_penalty != 0:
+ hold_ratio = br_h2.holding_penalty / br_h1.holding_penalty
+ self.assertAlmostEqual(
+ abs(hold_ratio),
+ (40 / 30) ** 2,
+ delta=0.4,
+ msg=f"Holding penalty quadratic scaling mismatch (ratio={hold_ratio})",
+ )
+
+ def test_exit_factor_threshold_warning_emission(self):
+ """Ensure a RuntimeWarning is emitted when exit_factor exceeds threshold (no capping)."""
+ import warnings as _warnings
+
+ params = self.DEFAULT_PARAMS.copy()
+ params["exit_factor_threshold"] = 10.0 # low threshold to trigger easily
+ # Remove base_factor to allow argument override
+ params.pop("base_factor", None)
+ context = self._mk_context(pnl=0.06, trade_duration=10)
+ with _warnings.catch_warnings(record=True) as w:
+ _warnings.simplefilter("always")
+ br = calculate_reward(
+ context,
+ params,
+ base_factor=5000.0, # large enough to exceed threshold
+ profit_target=0.03,
+ risk_reward_ratio=2.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertGreater(br.exit_component, 0.0)
+ emitted = [warn for warn in w if issubclass(warn.category, RuntimeWarning)]
+ self.assertTrue(
+ len(emitted) >= 1, "Expected a RuntimeWarning for exit factor threshold"
+ )
+ # Confirm message indicates threshold exceedance (supports legacy and new message format)
+ self.assertTrue(
+ any(
+ (
+ "exceeded threshold" in str(warn.message)
+ or "exceeds threshold" in str(warn.message)
+ or "|factor|=" in str(warn.message)
+ )
+ for warn in emitted
+ ),
+ "Warning message should indicate threshold exceedance",
+ )
+
+ def test_public_wrapper_compute_exit_factor(self):
+ """Basic sanity check of newly exposed compute_exit_factor wrapper."""
+ from reward_space_analysis import compute_exit_factor
+
+ params = self.DEFAULT_PARAMS.copy()
+ params["exit_factor_mode"] = "sqrt"
+ f1 = compute_exit_factor(100.0, 0.02, 1.0, 0.0, params)
+ f2 = compute_exit_factor(100.0, 0.02, 1.0, 1.0, params)
+ self.assertGreater(
+ f1, f2, "Attenuation should reduce factor at higher duration ratio"
+ )
+
if __name__ == "__main__":
# Configure test discovery and execution