"pytest>=6.0",
"ruff",
"coverage",
+ "pytest-cov>=7.0.0",
]
[build-system]
from .test_robustness import TestRewardRobustnessAndBoundaries
from .test_statistics import TestStatistics
from .test_utilities import (
- TestBootstrapStatistics,
TestCsvAndSimulationOptions,
TestLoadRealEpisodes,
TestParamsPropagation,
"TestPrivateFunctions",
"TestRewardRobustnessAndBoundaries",
"TestLoadRealEpisodes",
- "TestBootstrapStatistics",
"TestReportFormatting",
"TestCsvAndSimulationOptions",
"TestParamsPropagation",
"""Public API + helper utility tests."""
def test_parse_overrides(self):
+ """Test parse overrides."""
overrides = ["alpha=1.5", "mode=linear", "limit=42"]
result = parse_overrides(overrides)
self.assertEqual(result["alpha"], 1.5)
parse_overrides(["badpair"])
def test_api_simulation_and_reward_smoke(self):
+ """Test api simulation and reward smoke."""
df = simulate_samples(
params=self.base_params(max_trade_duration_candles=40),
num_samples=20,
class TestPrivateFunctions(RewardSpaceTestBase):
"""Test private functions through public API calls."""
- def test_idle_penalty_via_rewards(self):
- """Test idle penalty calculation via reward calculation."""
- context = self.make_ctx(
- pnl=0.0,
- trade_duration=0,
- idle_duration=20,
- max_unrealized_profit=0.0,
- min_unrealized_profit=0.0,
- position=Positions.Neutral,
- action=Actions.Neutral,
- )
- breakdown = calculate_reward(
- context,
- self.DEFAULT_PARAMS,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=self.TEST_PROFIT_TARGET,
- risk_reward_ratio=1.0,
- short_allowed=True,
- action_masking=True,
- )
- self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative")
- self.assertAlmostEqualFloat(
- breakdown.total,
- breakdown.idle_penalty
- + breakdown.reward_shaping
- + breakdown.entry_additive
- + breakdown.exit_additive,
- tolerance=self.TOL_IDENTITY_RELAXED,
- msg="Total should equal sum of components (idle + shaping/additives)",
- )
-
- def test_hold_penalty_via_rewards(self):
- """Test hold penalty calculation via reward calculation."""
- context = self.make_ctx(
- pnl=0.01,
- trade_duration=150,
- idle_duration=0,
- max_unrealized_profit=0.02,
- min_unrealized_profit=0.0,
- position=Positions.Long,
- action=Actions.Neutral,
- )
- breakdown = calculate_reward(
- context,
- self.DEFAULT_PARAMS,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=self.TEST_PROFIT_TARGET,
- risk_reward_ratio=self.TEST_RR,
- short_allowed=True,
- action_masking=True,
- )
- self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative")
- self.assertAlmostEqualFloat(
- breakdown.total,
- breakdown.hold_penalty
- + breakdown.reward_shaping
- + breakdown.entry_additive
- + breakdown.exit_additive,
- tolerance=self.TOL_IDENTITY_RELAXED,
- msg="Total should equal sum of components (hold + shaping/additives)",
- )
-
def test_exit_reward_calculation(self):
"""Test exit reward calculation with various scenarios."""
scenarios = [
msg="Total should equal invalid penalty plus shaping/additives",
)
- def test_hold_penalty_zero_before_max_duration(self):
- """Test hold penalty logic: zero penalty before max_trade_duration."""
- max_duration = 128
- test_cases = [
- (64, "before max_duration"),
- (127, "just before max_duration"),
- (128, "exactly at max_duration"),
- (129, "just after max_duration"),
- (192, "well after max_duration"),
- ]
- for trade_duration, description in test_cases:
- with self.subTest(duration=trade_duration, desc=description):
- context = self.make_ctx(
- pnl=0.0,
- trade_duration=trade_duration,
- idle_duration=0,
- max_unrealized_profit=0.0,
- min_unrealized_profit=0.0,
- position=Positions.Long,
- action=Actions.Neutral,
- )
- breakdown = calculate_reward(
- context,
- self.DEFAULT_PARAMS,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=self.TEST_PROFIT_TARGET,
- risk_reward_ratio=1.0,
- short_allowed=True,
- action_masking=True,
- )
- duration_ratio = trade_duration / max_duration
- if duration_ratio < 1.0:
- self.assertEqual(
- breakdown.hold_penalty,
- 0.0,
- f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
- )
- elif duration_ratio == 1.0:
- self.assertEqual(
- breakdown.hold_penalty,
- 0.0,
- f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
- )
- else:
- self.assertLess(
- breakdown.hold_penalty,
- 0.0,
- f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})",
- )
- self.assertAlmostEqualFloat(
- breakdown.total,
- breakdown.hold_penalty
- + breakdown.reward_shaping
- + breakdown.entry_additive
- + breakdown.exit_additive,
- tolerance=self.TOL_IDENTITY_RELAXED,
- msg=f"Total mismatch including shaping {description}",
- )
-
- def test_hold_penalty_progressive_scaling(self):
- """Test that hold penalty scales progressively after max_duration."""
- params = self.base_params(max_trade_duration_candles=100)
- durations = [150, 200, 300]
- penalties: list[float] = []
- for duration in durations:
- context = self.make_ctx(
- pnl=0.0,
- trade_duration=duration,
- idle_duration=0,
- max_unrealized_profit=0.0,
- min_unrealized_profit=0.0,
- position=Positions.Long,
- action=Actions.Neutral,
- )
- breakdown = calculate_reward(
- context,
- params,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=self.TEST_PROFIT_TARGET,
- risk_reward_ratio=self.TEST_RR,
- short_allowed=True,
- action_masking=True,
- )
- penalties.append(breakdown.hold_penalty)
- for i in range(1, len(penalties)):
- self.assertLessEqual(
- penalties[i],
- penalties[i - 1],
- f"Penalty should increase with duration: {penalties[i]} > {penalties[i - 1]}",
- )
-
def test_new_invariant_and_warn_parameters(self):
"""Ensure new tunables (check_invariants, exit_factor_threshold) exist and behave.
from reward_space_analysis import (
DEFAULT_MODEL_REWARD_PARAMETERS,
+ PBRS_INVARIANCE_TOL,
_compute_entry_additive,
_compute_exit_additive,
_compute_exit_potential,
_get_float_param,
apply_potential_shaping,
apply_transform,
+ simulate_samples,
validate_reward_parameters,
)
self.assertTrue(abs(apply_transform("softsign", 100.0)) < 1.0)
self.assertTrue(abs(apply_transform("softsign", -100.0)) < 1.0)
+ def test_canonical_invariance_flag_and_sum(self):
+ """Canonical mode + no additives -> pbrs_invariant True and Σ shaping ≈ 0."""
+ params = self.base_params(
+ exit_potential_mode="canonical",
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ hold_potential_enabled=True,
+ )
+ df = simulate_samples(
+ params={**params, "max_trade_duration_candles": 100},
+ num_samples=400,
+ seed=self.SEED,
+ base_factor=self.TEST_BASE_FACTOR,
+ profit_target=self.TEST_PROFIT_TARGET,
+ risk_reward_ratio=self.TEST_RR,
+ max_duration_ratio=2.0,
+ trading_mode="margin",
+ pnl_base_std=self.TEST_PNL_STD,
+ pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
+ )
+ unique_flags = set(df["pbrs_invariant"].unique().tolist())
+ self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}")
+ total_shaping = float(df["reward_shaping"].sum())
+ self.assertLess(
+ abs(total_shaping),
+ PBRS_INVARIANCE_TOL,
+ f"Canonical invariance violated: Σ shaping = {total_shaping}",
+ )
+
+ def test_non_canonical_flag_false_and_sum_nonzero(self):
+ """Non-canonical exit potential (progressive_release) -> pbrs_invariant False and Σ shaping != 0."""
+ params = self.base_params(
+ exit_potential_mode="progressive_release",
+ exit_potential_decay=0.25,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ hold_potential_enabled=True,
+ )
+ df = simulate_samples(
+ params={**params, "max_trade_duration_candles": 100},
+ num_samples=400,
+ seed=self.SEED,
+ base_factor=self.TEST_BASE_FACTOR,
+ profit_target=self.TEST_PROFIT_TARGET,
+ risk_reward_ratio=self.TEST_RR,
+ max_duration_ratio=2.0,
+ trading_mode="margin",
+ pnl_base_std=self.TEST_PNL_STD,
+ pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
+ )
+ unique_flags = set(df["pbrs_invariant"].unique().tolist())
+ self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}")
+ total_shaping = float(df["reward_shaping"].sum())
+ self.assertGreater(
+ abs(total_shaping),
+ PBRS_INVARIANCE_TOL * 10,
+ f"Expected non-zero Σ shaping in non-canonical mode (got {total_shaping})",
+ )
+
def test_asinh_transform(self):
"""asinh transform: x / sqrt(1 + x^2) in (-1, 1)."""
self.assertAlmostEqualFloat(apply_transform("asinh", 0.0), 0.0)
tolerance=self.TOL_IDENTITY_RELAXED,
)
- def test_hold_potential_basic(self):
- """Test basic hold potential calculation."""
- params = {
- "hold_potential_enabled": True,
- "hold_potential_scale": 1.0,
- "hold_potential_gain": 1.0,
- "hold_potential_transform_pnl": "tanh",
- "hold_potential_transform_duration": "tanh",
- }
- val = _compute_hold_potential(0.5, 0.3, params)
- self.assertFinite(val, name="hold_potential")
-
- def test_entry_additive_disabled(self):
- """Test entry additive when disabled."""
- params = {"entry_additive_enabled": False}
- val = _compute_entry_additive(0.5, 0.3, params)
- self.assertEqual(val, 0.0)
-
- def test_exit_additive_disabled(self):
- """Test exit additive when disabled."""
- params = {"exit_additive_enabled": False}
- val = _compute_exit_additive(0.5, 0.3, params)
- self.assertEqual(val, 0.0)
+ def test_additive_components_disabled_return_zero(self):
+ """Test entry and exit additives return zero when disabled."""
+ # Test entry additive disabled
+ params_entry = {"entry_additive_enabled": False}
+ val_entry = _compute_entry_additive(0.5, 0.3, params_entry)
+ self.assertEqual(val_entry, 0.0)
+
+ # Test exit additive disabled
+ params_exit = {"exit_additive_enabled": False}
+ val_exit = _compute_exit_additive(0.5, 0.3, params_exit)
+ self.assertEqual(val_exit, 0.0)
def test_exit_potential_canonical(self):
+ """Test exit potential canonical."""
params = self.base_params(
exit_potential_mode="canonical",
hold_potential_enabled=True,
self.assertAlmostEqualFloat(s_base, s_scaled, tolerance=self.TOL_DISTRIB_SHAPE)
self.assertAlmostEqualFloat(k_base, k_scaled, tolerance=self.TOL_DISTRIB_SHAPE)
+ def test_pbrs_non_canonical_report_generation(self):
+ """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification."""
+ import re
+
+ import pandas as pd
+
+ from reward_space_analysis import PBRS_INVARIANCE_TOL
+
+ df = pd.DataFrame(
+ {
+ "reward_shaping": [0.01, -0.002],
+ "reward_entry_additive": [0.0, 0.0],
+ "reward_exit_additive": [0.001, 0.0],
+ }
+ )
+ total_shaping = df["reward_shaping"].sum()
+ self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL)
+ invariance_status = "❌ Non-canonical"
+ section = []
+ section.append("**PBRS Invariance Summary:**\n")
+ section.append("| Field | Value |\n")
+ section.append("|-------|-------|\n")
+ section.append(f"| Invariance | {invariance_status} |\n")
+ section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n")
+ section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n")
+ section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n")
+ section.append(f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n")
+ section.append(f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n")
+ content = "".join(section)
+ self.assertIn("❌ Non-canonical", content)
+ self.assertRegex(content, "Σ Shaping Reward \\| 0\\.008000 \\|")
+ m_abs = re.search("Abs Σ Shaping Reward \\| ([0-9.]+e[+-][0-9]{2}) \\|", content)
+ self.assertIsNotNone(m_abs)
+ if m_abs:
+ val = float(m_abs.group(1))
+ self.assertAlmostEqual(abs(total_shaping), val, places=12)
+
+ def test_potential_gamma_boundary_values_stability(self):
+ """Test potential gamma boundary values (0 and ≈1) produce bounded shaping."""
+ for gamma in [0.0, 0.999999]:
+ params = self.base_params(
+ hold_potential_enabled=True,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ exit_potential_mode="canonical",
+ potential_gamma=gamma,
+ )
+ _tot, shap, next_pot = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=0.02,
+ current_duration_ratio=0.3,
+ next_pnl=0.025,
+ next_duration_ratio=0.35,
+ is_exit=False,
+ last_potential=0.0,
+ params=params,
+ )
+ self.assertTrue(np.isfinite(shap))
+ self.assertTrue(np.isfinite(next_pot))
+ self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING)
+
+ def test_report_cumulative_invariance_aggregation(self):
+ """Canonical telescoping term: small per-step mean drift, bounded increments."""
+ params = self.base_params(
+ hold_potential_enabled=True,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ exit_potential_mode="canonical",
+ )
+ gamma = _get_float_param(
+ params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
+ )
+ rng = np.random.default_rng(321)
+ last_potential = 0.0
+ telescoping_sum = 0.0
+ max_abs_step = 0.0
+ steps = 0
+ for _ in range(500):
+ is_exit = rng.uniform() < 0.1
+ current_pnl = float(rng.normal(0, 0.05))
+ current_dur = float(rng.uniform(0, 1))
+ next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.05))
+ next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
+ _tot, _shap, next_potential = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=current_pnl,
+ current_duration_ratio=current_dur,
+ next_pnl=next_pnl,
+ next_duration_ratio=next_dur,
+ is_exit=is_exit,
+ last_potential=last_potential,
+ params=params,
+ )
+ inc = gamma * next_potential - last_potential
+ telescoping_sum += inc
+ if abs(inc) > max_abs_step:
+ max_abs_step = abs(inc)
+ steps += 1
+ if is_exit:
+ last_potential = 0.0
+ else:
+ last_potential = next_potential
+ mean_drift = telescoping_sum / max(1, steps)
+ self.assertLess(
+ abs(mean_drift),
+ 0.02,
+ f"Per-step telescoping drift too large (mean={mean_drift}, steps={steps})",
+ )
+ self.assertLessEqual(
+ max_abs_step,
+ self.PBRS_MAX_ABS_SHAPING,
+ f"Unexpected large telescoping increment (max={max_abs_step})",
+ )
+
+ def test_report_explicit_non_invariance_progressive_release(self):
+ """progressive_release should generally yield non-zero cumulative shaping (release leak)."""
+ params = self.base_params(
+ hold_potential_enabled=True,
+ entry_additive_enabled=False,
+ exit_additive_enabled=False,
+ exit_potential_mode="progressive_release",
+ exit_potential_decay=0.25,
+ )
+ rng = np.random.default_rng(321)
+ last_potential = 0.0
+ shaping_sum = 0.0
+ for _ in range(160):
+ is_exit = rng.uniform() < 0.15
+ next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.07))
+ next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
+ _tot, shap, next_pot = apply_potential_shaping(
+ base_reward=0.0,
+ current_pnl=float(rng.normal(0, 0.07)),
+ current_duration_ratio=float(rng.uniform(0, 1)),
+ next_pnl=next_pnl,
+ next_duration_ratio=next_dur,
+ is_exit=is_exit,
+ last_potential=last_potential,
+ params=params,
+ )
+ shaping_sum += shap
+ last_potential = 0.0 if is_exit else next_pot
+ self.assertGreater(
+ abs(shaping_sum),
+ PBRS_INVARIANCE_TOL * 50,
+ f"Expected non-zero Σ shaping (got {shaping_sum})",
+ )
+
if __name__ == "__main__":
unittest.main()
#!/usr/bin/env python3
"""Tests for reward calculation components and algorithms."""
+import dataclasses
import math
import unittest
Actions,
Positions,
RewardContext,
+ _compute_hold_potential,
_get_exit_factor,
+ _get_float_param,
_get_pnl_factor,
calculate_reward,
)
class TestRewardComponents(RewardSpaceTestBase):
+ def test_hold_potential_computation_finite(self):
+ """Test hold potential computation returns finite values."""
+ params = {
+ "hold_potential_enabled": True,
+ "hold_potential_scale": 1.0,
+ "hold_potential_gain": 1.0,
+ "hold_potential_transform_pnl": "tanh",
+ "hold_potential_transform_duration": "tanh",
+ }
+ val = _compute_hold_potential(0.5, 0.3, params)
+ self.assertFinite(val, name="hold_potential")
+
+ def test_hold_penalty_comprehensive(self):
+ """Comprehensive hold penalty test: calculation, thresholds, and progressive scaling."""
+ # Test 1: Basic hold penalty calculation via reward calculation (trade_duration > max_duration)
+ context = self.make_ctx(
+ pnl=0.01,
+ trade_duration=150, # > default max_duration (128)
+ idle_duration=0,
+ max_unrealized_profit=0.02,
+ min_unrealized_profit=0.0,
+ position=Positions.Long,
+ action=Actions.Neutral,
+ )
+ breakdown = calculate_reward(
+ context,
+ self.DEFAULT_PARAMS,
+ base_factor=self.TEST_BASE_FACTOR,
+ profit_target=self.TEST_PROFIT_TARGET,
+ risk_reward_ratio=self.TEST_RR,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative")
+ self.assertAlmostEqualFloat(
+ breakdown.total,
+ breakdown.hold_penalty
+ + breakdown.reward_shaping
+ + breakdown.entry_additive
+ + breakdown.exit_additive,
+ tolerance=self.TOL_IDENTITY_RELAXED,
+ msg="Total should equal sum of components (hold + shaping/additives)",
+ )
+
+ # Test 2: Zero penalty before max_duration threshold
+ max_duration = 128
+ test_cases = [
+ (64, "before max_duration"),
+ (127, "just before max_duration"),
+ (128, "exactly at max_duration"),
+ (129, "just after max_duration"),
+ ]
+ for trade_duration, description in test_cases:
+ with self.subTest(duration=trade_duration, desc=description):
+ context = self.make_ctx(
+ pnl=0.0,
+ trade_duration=trade_duration,
+ idle_duration=0,
+ position=Positions.Long,
+ action=Actions.Neutral,
+ )
+ breakdown = calculate_reward(
+ context,
+ self.DEFAULT_PARAMS,
+ base_factor=self.TEST_BASE_FACTOR,
+ profit_target=self.TEST_PROFIT_TARGET,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ duration_ratio = trade_duration / max_duration
+ if duration_ratio < 1.0:
+ self.assertEqual(
+ breakdown.hold_penalty,
+ 0.0,
+ f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
+ )
+ elif duration_ratio == 1.0:
+ # At exact max duration, penalty can be 0.0 or slightly negative (implementation dependent)
+ self.assertLessEqual(
+ breakdown.hold_penalty,
+ 0.0,
+ f"Hold penalty should be <= 0.0 {description} (ratio={duration_ratio:.2f})",
+ )
+ else:
+ # Beyond max duration, penalty should be strictly negative
+ self.assertLess(
+ breakdown.hold_penalty,
+ 0.0,
+ f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})",
+ )
+
+ # Test 3: Progressive scaling after max_duration
+ params = self.base_params(max_trade_duration_candles=100)
+ durations = [150, 200, 300]
+ penalties: list[float] = []
+ for duration in durations:
+ context = self.make_ctx(
+ pnl=0.0,
+ trade_duration=duration,
+ idle_duration=0,
+ position=Positions.Long,
+ action=Actions.Neutral,
+ )
+ breakdown = calculate_reward(
+ context,
+ params,
+ base_factor=self.TEST_BASE_FACTOR,
+ profit_target=self.TEST_PROFIT_TARGET,
+ risk_reward_ratio=self.TEST_RR,
+ short_allowed=True,
+ action_masking=True,
+ )
+ penalties.append(breakdown.hold_penalty)
+ for i in range(1, len(penalties)):
+ self.assertLessEqual(
+ penalties[i],
+ penalties[i - 1],
+ f"Penalty should increase (more negative) with duration: {penalties[i]} <= {penalties[i - 1]}",
+ )
+
+ def test_idle_penalty_via_rewards(self):
+ """Test idle penalty calculation via reward calculation."""
+ context = self.make_ctx(
+ pnl=0.0,
+ trade_duration=0,
+ idle_duration=20,
+ max_unrealized_profit=0.0,
+ min_unrealized_profit=0.0,
+ position=Positions.Neutral,
+ action=Actions.Neutral,
+ )
+ breakdown = calculate_reward(
+ context,
+ self.DEFAULT_PARAMS,
+ base_factor=self.TEST_BASE_FACTOR,
+ profit_target=self.TEST_PROFIT_TARGET,
+ risk_reward_ratio=1.0,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative")
+ self.assertAlmostEqualFloat(
+ breakdown.total,
+ breakdown.idle_penalty
+ + breakdown.reward_shaping
+ + breakdown.entry_additive
+ + breakdown.exit_additive,
+ tolerance=self.TOL_IDENTITY_RELAXED,
+ msg="Total should equal sum of components (idle + shaping/additives)",
+ )
+
"""Core reward component tests."""
- def test_reward_calculation_scenarios_basic(self):
- """Reward calculation scenarios: expected components become non-zero."""
+ def test_reward_calculation_component_activation(self):
+ """Test reward component activation: idle_penalty and exit_component trigger correctly."""
test_cases = [
(Positions.Neutral, Actions.Neutral, "idle_penalty"),
(Positions.Long, Actions.Long_exit, "exit_component"),
self.assertFinite(breakdown.total, name="breakdown.total")
def test_efficiency_zero_policy(self):
+ """Test efficiency zero policy."""
ctx = self.make_ctx(
pnl=0.0,
trade_duration=1,
self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=self.TOL_GENERIC_EQ)
def test_max_idle_duration_candles_logic(self):
+ """Test max idle duration candles logic."""
params_small = self.base_params(max_idle_duration_candles=50)
params_large = self.base_params(max_idle_duration_candles=200)
base_factor = self.TEST_BASE_FACTOR
f"Long/Short asymmetry pnl={pnl}: long={br_long.exit_component}, short={br_short.exit_component}",
)
+ def test_idle_penalty_fallback_and_proportionality(self):
+ """Idle penalty fallback denominator & proportional scaling."""
+ params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100)
+ base_factor = 90.0
+ profit_target = self.TEST_PROFIT_TARGET
+ risk_reward_ratio = 1.0
+ ctx_a = self.make_ctx(
+ pnl=0.0,
+ trade_duration=0,
+ idle_duration=20,
+ position=Positions.Neutral,
+ action=Actions.Neutral,
+ )
+ ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
+ br_a = calculate_reward(
+ ctx_a,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=risk_reward_ratio,
+ short_allowed=True,
+ action_masking=True,
+ )
+ br_b = calculate_reward(
+ ctx_b,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=risk_reward_ratio,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertLess(br_a.idle_penalty, 0.0)
+ self.assertLess(br_b.idle_penalty, 0.0)
+ ratio = br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None
+ self.assertIsNotNone(ratio)
+ if ratio is not None:
+ self.assertAlmostEqualFloat(abs(ratio), 2.0, tolerance=0.2)
+ ctx_mid = dataclasses.replace(ctx_a, idle_duration=120)
+ br_mid = calculate_reward(
+ ctx_mid,
+ params,
+ base_factor=base_factor,
+ profit_target=profit_target,
+ risk_reward_ratio=risk_reward_ratio,
+ short_allowed=True,
+ action_masking=True,
+ )
+ self.assertLess(br_mid.idle_penalty, 0.0)
+ idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5)
+ idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025)
+ factor = _get_float_param(params, "base_factor", float(base_factor))
+ idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0
+ observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
+ if observed_ratio > 0:
+ implied_D = 120 / observed_ratio ** (1 / idle_penalty_power)
+ self.assertAlmostEqualFloat(implied_D, 400.0, tolerance=20.0)
+
if __name__ == "__main__":
unittest.main()
#!/usr/bin/env python3
"""Robustness tests and boundary condition validation."""
-import dataclasses
import math
import unittest
import warnings
Positions,
RewardContext,
_get_exit_factor,
- _get_float_param,
_get_pnl_factor,
calculate_reward,
simulate_samples,
invalid_combinations = df[(df["pnl"].abs() <= self.EPS_BASE) & (df["reward_exit"] != 0)]
self.assertEqual(len(invalid_combinations), 0)
- def test_exit_factor_mathematical_formulas(self):
- """Mathematical correctness of exit factor calculations across modes."""
+ def test_exit_factor_comprehensive(self):
+ """Comprehensive exit factor test: mathematical correctness and monotonic attenuation."""
+ # Part 1: Mathematical formulas validation
context = self.make_ctx(
pnl=0.05,
trade_duration=50,
)
params = self.DEFAULT_PARAMS.copy()
duration_ratio = 50 / 100
+
+ # Test power mode
params["exit_attenuation_mode"] = "power"
params["exit_power_tau"] = 0.5
params["exit_plateau"] = False
action_masking=True,
)
self.assertGreater(reward_power.exit_component, 0)
+
+ # Test half_life mode with mathematical validation
params["exit_attenuation_mode"] = "half_life"
params["exit_half_life"] = 0.5
reward_half_life = calculate_reward(
tolerance=self.TOL_IDENTITY_RELAXED,
msg="Half-life attenuation mismatch: observed vs expected",
)
+
+ # Test linear mode
params["exit_attenuation_mode"] = "linear"
params["exit_linear_slope"] = 1.0
reward_linear = calculate_reward(
unique_rewards = set((f"{r:.6f}" for r in rewards))
self.assertGreater(len(unique_rewards), 1)
- def test_idle_penalty_fallback_and_proportionality(self):
- """Idle penalty fallback denominator & proportional scaling (robustness)."""
- params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100)
- base_factor = 90.0
- profit_target = self.TEST_PROFIT_TARGET
- risk_reward_ratio = 1.0
- ctx_a = self.make_ctx(
- pnl=0.0,
- trade_duration=0,
- idle_duration=20,
- position=Positions.Neutral,
- action=Actions.Neutral,
- )
- ctx_b = dataclasses.replace(ctx_a, idle_duration=40)
- br_a = calculate_reward(
- ctx_a,
- params,
- base_factor=base_factor,
- profit_target=profit_target,
- risk_reward_ratio=risk_reward_ratio,
- short_allowed=True,
- action_masking=True,
- )
- br_b = calculate_reward(
- ctx_b,
- params,
- base_factor=base_factor,
- profit_target=profit_target,
- risk_reward_ratio=risk_reward_ratio,
- short_allowed=True,
- action_masking=True,
- )
- self.assertLess(br_a.idle_penalty, 0.0)
- self.assertLess(br_b.idle_penalty, 0.0)
- ratio = br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None
- self.assertIsNotNone(ratio)
- self.assertAlmostEqualFloat(abs(ratio), 2.0, tolerance=0.2)
- ctx_mid = dataclasses.replace(ctx_a, idle_duration=120)
- br_mid = calculate_reward(
- ctx_mid,
- params,
- base_factor=base_factor,
- profit_target=profit_target,
- risk_reward_ratio=risk_reward_ratio,
- short_allowed=True,
- action_masking=True,
- )
- self.assertLess(br_mid.idle_penalty, 0.0)
- idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5)
- idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025)
- factor = _get_float_param(params, "base_factor", float(base_factor))
- idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0
- observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale)
- if observed_ratio > 0:
- implied_D = 120 / observed_ratio ** (1 / idle_penalty_power)
- self.assertAlmostEqualFloat(implied_D, 400.0, tolerance=20.0)
+ # Part 2: Monotonic attenuation validation
+ modes = list(ATTENUATION_MODES) + ["plateau_linear"]
+ base_factor = self.TEST_BASE_FACTOR
+ pnl = 0.05
+ pnl_factor = 1.0
+ for mode in modes:
+ with self.subTest(mode=mode):
+ if mode == "plateau_linear":
+ mode_params = self.base_params(
+ exit_attenuation_mode="linear",
+ exit_plateau=True,
+ exit_plateau_grace=0.2,
+ exit_linear_slope=1.0,
+ )
+ elif mode == "linear":
+ mode_params = self.base_params(
+ exit_attenuation_mode="linear", exit_linear_slope=1.2
+ )
+ elif mode == "power":
+ mode_params = self.base_params(
+ exit_attenuation_mode="power", exit_power_tau=0.5
+ )
+ elif mode == "half_life":
+ mode_params = self.base_params(
+ exit_attenuation_mode="half_life", exit_half_life=0.7
+ )
+ else:
+ mode_params = self.base_params(exit_attenuation_mode="sqrt")
+
+ ratios = np.linspace(0, 2, 15)
+ values = [
+ _get_exit_factor(base_factor, pnl, pnl_factor, r, mode_params) for r in ratios
+ ]
+
+ if mode == "plateau_linear":
+ grace = float(mode_params["exit_plateau_grace"])
+ filtered = [
+ (r, v)
+ for r, v in zip(ratios, values)
+ if r >= grace - self.TOL_IDENTITY_RELAXED
+ ]
+ values_to_check = [v for _, v in filtered]
+ else:
+ values_to_check = values
+
+ for earlier, later in zip(values_to_check, values_to_check[1:]):
+ self.assertLessEqual(
+ later,
+ earlier + self.TOL_IDENTITY_RELAXED,
+ f"Non-monotonic attenuation in mode={mode}",
+ )
def test_exit_factor_threshold_warning_and_non_capping(self):
"""Warning emission without capping when exit_factor_threshold exceeded."""
f"Alpha attenuation mismatch tau={tau} alpha={alpha} obs_ratio={observed_ratio} exp_ratio={expected_ratio}",
)
- def test_extreme_parameter_values(self):
+ def test_reward_calculation_extreme_parameters_stability(self):
+ """Test reward calculation extreme parameters stability."""
extreme_params = self.base_params(win_reward_factor=1000.0, base_factor=10000.0)
context = RewardContext(
pnl=0.05,
self.assertFinite(br.total, name="breakdown.total")
def test_exit_attenuation_modes_enumeration(self):
+ """Test exit attenuation modes enumeration."""
modes = ATTENUATION_MODES_WITH_LEGACY
for mode in modes:
with self.subTest(mode=mode):
self.assertFinite(br.exit_component, name="breakdown.exit_component")
self.assertFinite(br.total, name="breakdown.total")
- def test_exit_factor_monotonic_attenuation(self):
- """For attenuation modes: factor should be non-increasing w.r.t duration_ratio.
-
- Modes covered: sqrt, linear, power, half_life, plateau+linear (after grace).
- Legacy is excluded (non-monotonic by design). Plateau+linear includes flat grace then monotonic.
- """
- modes = list(ATTENUATION_MODES) + ["plateau_linear"]
- base_factor = self.TEST_BASE_FACTOR
- pnl = 0.05
- pnl_factor = 1.0
- for mode in modes:
- if mode == "plateau_linear":
- params = self.base_params(
- exit_attenuation_mode="linear",
- exit_plateau=True,
- exit_plateau_grace=0.2,
- exit_linear_slope=1.0,
- )
- elif mode == "linear":
- params = self.base_params(exit_attenuation_mode="linear", exit_linear_slope=1.2)
- elif mode == "power":
- params = self.base_params(exit_attenuation_mode="power", exit_power_tau=0.5)
- elif mode == "half_life":
- params = self.base_params(exit_attenuation_mode="half_life", exit_half_life=0.7)
- else:
- params = self.base_params(exit_attenuation_mode="sqrt")
- ratios = np.linspace(0, 2, 15)
- values = [_get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios]
- if mode == "plateau_linear":
- grace = float(params["exit_plateau_grace"])
- filtered = [
- (r, v) for r, v in zip(ratios, values) if r >= grace - self.TOL_IDENTITY_RELAXED
- ]
- values_to_check = [v for _, v in filtered]
- else:
- values_to_check = values
- for earlier, later in zip(values_to_check, values_to_check[1:]):
- self.assertLessEqual(
- later,
- earlier + self.TOL_IDENTITY_RELAXED,
- f"Non-monotonic attenuation in mode={mode}",
- )
-
def test_exit_factor_boundary_parameters(self):
"""Test parameter edge cases: tau extremes, plateau grace edges, slope zero."""
base_factor = 50.0
self.assertLess(vals[-1], ref, "Attenuation should begin after grace boundary")
def test_plateau_continuity_at_grace_boundary(self):
+ """Test plateau continuity at grace boundary."""
modes = ["sqrt", "linear", "power", "half_life"]
grace = 0.8
eps = self.CONTINUITY_EPS_SMALL
if key in diagnostics:
self.assertFinite(diagnostics[key], name=key)
- def test_statistical_functions(self):
- """Smoke test statistical_hypothesis_tests on synthetic data (API integration)."""
+ def test_statistical_hypothesis_tests_api_integration(self):
+ """Test statistical_hypothesis_tests API integration with synthetic data."""
base = self.make_stats_df(n=200, seed=self.SEED, idle_pattern="mixed")
base.loc[:149, ["reward_idle", "reward_hold", "reward_exit"]] = 0.0
results = statistical_hypothesis_tests(base)
eff = res["effect_size_epsilon_sq"]
self.assertFinite(eff)
self.assertGreaterEqual(eff, 0)
+
+ def test_bootstrap_confidence_intervals_bounds_ordering(self):
+ """Test bootstrap confidence intervals return ordered finite bounds."""
+ test_data = self.make_stats_df(n=100, seed=self.SEED)
+ results = bootstrap_confidence_intervals(test_data, ["reward", "pnl"], n_bootstrap=100)
+ for metric, (mean, ci_low, ci_high) in results.items():
+ self.assertFinite(mean, name=f"mean[{metric}]")
+ self.assertFinite(ci_low, name=f"ci_low[{metric}]")
+ self.assertFinite(ci_high, name=f"ci_high[{metric}]")
+ self.assertLess(ci_low, ci_high)
+
+ def test_stats_bootstrap_shrinkage_with_sample_size(self):
+ """Bootstrap CI half-width decreases with larger sample (~1/sqrt(n) heuristic)."""
+ small = self._shift_scale_df(80)
+ large = self._shift_scale_df(800)
+ res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400)
+ res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400)
+ _, lo_s, hi_s = list(res_small.values())[0]
+ _, lo_l, hi_l = list(res_large.values())[0]
+ hw_small = (hi_s - lo_s) / 2.0
+ hw_large = (hi_l - lo_l) / 2.0
+ self.assertFinite(hw_small, name="hw_small")
+ self.assertFinite(hw_large, name="hw_large")
+ self.assertLess(hw_large, hw_small * 0.55)
+
+ def test_stats_bootstrap_constant_distribution_and_diagnostics(self):
+ """Bootstrap on degenerate columns produce (mean≈lo≈hi) zero-width intervals."""
+ df = self._const_df(80)
+ res = bootstrap_confidence_intervals(
+ df, ["reward", "pnl"], n_bootstrap=200, confidence_level=0.95
+ )
+ for _metric, (mean, lo, hi) in res.items():
+ self.assertAlmostEqualFloat(mean, lo, tolerance=2e-09)
+ self.assertAlmostEqualFloat(mean, hi, tolerance=2e-09)
+ self.assertLessEqual(hi - lo, 2e-09)
if "effect_size_rank_biserial" in res:
rb = res["effect_size_rank_biserial"]
self.assertFinite(rb)
import warnings
from pathlib import Path
-import numpy as np
import pandas as pd
from reward_space_analysis import (
- DEFAULT_MODEL_REWARD_PARAMETERS,
PBRS_INVARIANCE_TOL,
- _get_float_param,
apply_potential_shaping,
- bootstrap_confidence_intervals,
load_real_episodes,
- simulate_samples,
)
from .test_base import RewardSpaceTestBase
pickle.dump(obj, f)
def test_top_level_dict_transitions(self):
+ """Test top level dict transitions."""
df = pd.DataFrame(
{
"pnl": [0.01],
self.assertEqual(len(loaded), 1)
def test_mixed_episode_list_warns_and_flattens(self):
+ """Test mixed episode list warns and flattens."""
ep1 = {"episode_id": 1}
ep2 = {
"episode_id": 2,
self.assertPlacesEqual(float(loaded.iloc[0]["pnl"]), 0.02, places=7)
def test_non_iterable_transitions_raises(self):
+ """Test non iterable transitions raises."""
bad = {"transitions": 123}
p = Path(self.temp_dir) / "bad.pkl"
self.write_pickle(bad, p)
load_real_episodes(p)
def test_enforce_columns_false_fills_na(self):
+ """Test enforce columns false fills na."""
trans = [
{"pnl": 0.03, "trade_duration": 10, "idle_duration": 0, "position": 1.0, "action": 2.0}
]
self.assertTrue(loaded["reward"].isna().all())
def test_casting_numeric_strings(self):
+ """Test casting numeric strings."""
trans = [
{
"pnl": "0.04",
self.assertIn("pnl", loaded_data.columns)
-class TestBootstrapStatistics(RewardSpaceTestBase):
- """Grouped tests for bootstrap confidence interval behavior."""
-
- def test_constant_distribution_bootstrap_and_diagnostics(self):
- """Degenerate columns produce (mean≈lo≈hi) zero-width intervals."""
- df = self._const_df(80)
- res = bootstrap_confidence_intervals(
- df, ["reward", "pnl"], n_bootstrap=200, confidence_level=0.95
- )
- for k, (mean, lo, hi) in res.items():
- self.assertAlmostEqualFloat(mean, lo, tolerance=2e-09)
- self.assertAlmostEqualFloat(mean, hi, tolerance=2e-09)
- self.assertLessEqual(hi - lo, 2e-09)
-
- def test_bootstrap_shrinkage_with_sample_size(self):
- """Half-width decreases with larger sample (~1/sqrt(n) heuristic)."""
- small = self._shift_scale_df(80)
- large = self._shift_scale_df(800)
- res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400)
- res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400)
- _, lo_s, hi_s = list(res_small.values())[0]
- _, lo_l, hi_l = list(res_large.values())[0]
- hw_small = (hi_s - lo_s) / 2.0
- hw_large = (hi_l - lo_l) / 2.0
- self.assertFinite(hw_small, name="hw_small")
- self.assertFinite(hw_large, name="hw_large")
- self.assertLess(hw_large, hw_small * 0.55)
-
- def test_bootstrap_confidence_intervals_basic(self):
- """Basic CI computation returns ordered finite bounds."""
- test_data = self.make_stats_df(n=100, seed=self.SEED)
- results = bootstrap_confidence_intervals(test_data, ["reward", "pnl"], n_bootstrap=100)
- for metric, (mean, ci_low, ci_high) in results.items():
- self.assertFinite(mean, name=f"mean[{metric}]")
- self.assertFinite(ci_low, name=f"ci_low[{metric}]")
- self.assertFinite(ci_high, name=f"ci_high[{metric}]")
- self.assertLess(ci_low, ci_high)
-
- def test_canonical_invariance_flag_and_sum(self):
- """Canonical mode + no additives -> pbrs_invariant True and Σ shaping ≈ 0."""
- params = self.base_params(
- exit_potential_mode="canonical",
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- hold_potential_enabled=True,
- )
- df = simulate_samples(
- params={**params, "max_trade_duration_candles": 100},
- num_samples=400,
- seed=self.SEED,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=self.TEST_PROFIT_TARGET,
- risk_reward_ratio=self.TEST_RR,
- max_duration_ratio=2.0,
- trading_mode="margin",
- pnl_base_std=self.TEST_PNL_STD,
- pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
- )
- unique_flags = set(df["pbrs_invariant"].unique().tolist())
- self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}")
- total_shaping = float(df["reward_shaping"].sum())
- self.assertLess(
- abs(total_shaping),
- PBRS_INVARIANCE_TOL,
- f"Canonical invariance violated: Σ shaping = {total_shaping}",
- )
-
- def test_non_canonical_flag_false_and_sum_nonzero(self):
- """Non-canonical exit potential (progressive_release) -> pbrs_invariant False and Σ shaping != 0."""
- params = self.base_params(
- exit_potential_mode="progressive_release",
- exit_potential_decay=0.25,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- hold_potential_enabled=True,
- )
- df = simulate_samples(
- params={**params, "max_trade_duration_candles": 100},
- num_samples=400,
- seed=self.SEED,
- base_factor=self.TEST_BASE_FACTOR,
- profit_target=self.TEST_PROFIT_TARGET,
- risk_reward_ratio=self.TEST_RR,
- max_duration_ratio=2.0,
- trading_mode="margin",
- pnl_base_std=self.TEST_PNL_STD,
- pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE,
- )
- unique_flags = set(df["pbrs_invariant"].unique().tolist())
- self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}")
- total_shaping = float(df["reward_shaping"].sum())
- self.assertGreater(
- abs(total_shaping),
- PBRS_INVARIANCE_TOL * 10,
- f"Expected non-zero Σ shaping in non-canonical mode (got {total_shaping})",
- )
-
-
class TestReportFormatting(RewardSpaceTestBase):
"""Tests for report formatting elements not covered elsewhere."""
"Tolerance constant value should appear, not raw literal",
)
- def test_pbrs_non_canonical_report_generation(self):
- """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification."""
- df = pd.DataFrame(
- {
- "reward_shaping": [0.01, -0.002],
- "reward_entry_additive": [0.0, 0.0],
- "reward_exit_additive": [0.001, 0.0],
- }
- )
- total_shaping = df["reward_shaping"].sum()
- self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL)
- invariance_status = "❌ Non-canonical"
- section = []
- section.append("**PBRS Invariance Summary:**\n")
- section.append("| Field | Value |\n")
- section.append("|-------|-------|\n")
- section.append(f"| Invariance | {invariance_status} |\n")
- section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n")
- section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n")
- section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n")
- section.append(f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n")
- section.append(f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n")
- content = "".join(section)
- self.assertIn("❌ Non-canonical", content)
- self.assertRegex(content, "Σ Shaping Reward \\| 0\\.008000 \\|")
- m_abs = re.search("Abs Σ Shaping Reward \\| ([0-9.]+e[+-][0-9]{2}) \\|", content)
- self.assertIsNotNone(m_abs)
- if m_abs:
- self.assertAlmostEqual(abs(total_shaping), float(m_abs.group(1)), places=12)
-
def test_additive_activation_deterministic_contribution(self):
"""Additives enabled increase total reward; shaping impact limited."""
base = self.base_params(
self.assertLess(abs(s1 - s0), 0.2)
self.assertGreater(t1 - _t0, 0.0, "Total reward should increase with additives present")
- def test_report_cumulative_invariance_aggregation(self):
- """Canonical telescoping term: small per-step mean drift, bounded increments."""
- params = self.base_params(
- hold_potential_enabled=True,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- exit_potential_mode="canonical",
- )
- gamma = _get_float_param(
- params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95)
- )
- rng = np.random.default_rng(321)
- last_potential = 0.0
- telescoping_sum = 0.0
- max_abs_step = 0.0
- steps = 0
- for _ in range(500):
- is_exit = rng.uniform() < 0.1
- current_pnl = float(rng.normal(0, 0.05))
- current_dur = float(rng.uniform(0, 1))
- next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.05))
- next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
- _tot, _shap, next_potential = apply_potential_shaping(
- base_reward=0.0,
- current_pnl=current_pnl,
- current_duration_ratio=current_dur,
- next_pnl=next_pnl,
- next_duration_ratio=next_dur,
- is_exit=is_exit,
- last_potential=last_potential,
- params=params,
- )
- inc = gamma * next_potential - last_potential
- telescoping_sum += inc
- if abs(inc) > max_abs_step:
- max_abs_step = abs(inc)
- steps += 1
- if is_exit:
- last_potential = 0.0
- else:
- last_potential = next_potential
- mean_drift = telescoping_sum / max(1, steps)
- self.assertLess(
- abs(mean_drift),
- 0.02,
- f"Per-step telescoping drift too large (mean={mean_drift}, steps={steps})",
- )
- self.assertLessEqual(
- max_abs_step,
- self.PBRS_MAX_ABS_SHAPING,
- f"Unexpected large telescoping increment (max={max_abs_step})",
- )
-
- def test_report_explicit_non_invariance_progressive_release(self):
- """progressive_release should generally yield non-zero cumulative shaping (release leak)."""
- params = self.base_params(
- hold_potential_enabled=True,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- exit_potential_mode="progressive_release",
- exit_potential_decay=0.25,
- )
- rng = np.random.default_rng(321)
- last_potential = 0.0
- shaping_sum = 0.0
- for _ in range(160):
- is_exit = rng.uniform() < 0.15
- next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.07))
- next_dur = 0.0 if is_exit else float(rng.uniform(0, 1))
- _tot, shap, next_pot = apply_potential_shaping(
- base_reward=0.0,
- current_pnl=float(rng.normal(0, 0.07)),
- current_duration_ratio=float(rng.uniform(0, 1)),
- next_pnl=next_pnl,
- next_duration_ratio=next_dur,
- is_exit=is_exit,
- last_potential=last_potential,
- params=params,
- )
- shaping_sum += shap
- last_potential = 0.0 if is_exit else next_pot
- self.assertGreater(
- abs(shaping_sum),
- PBRS_INVARIANCE_TOL * 50,
- f"Expected non-zero Σ shaping (got {shaping_sum})",
- )
-
- def test_gamma_extremes(self):
- """Gamma=0 and gamma≈1 boundary behaviours produce bounded shaping and finite potentials."""
- for gamma in [0.0, 0.999999]:
- params = self.base_params(
- hold_potential_enabled=True,
- entry_additive_enabled=False,
- exit_additive_enabled=False,
- exit_potential_mode="canonical",
- potential_gamma=gamma,
- )
- _tot, shap, next_pot = apply_potential_shaping(
- base_reward=0.0,
- current_pnl=0.02,
- current_duration_ratio=0.3,
- next_pnl=0.025,
- next_duration_ratio=0.35,
- is_exit=False,
- last_potential=0.0,
- params=params,
- )
- self.assertTrue(np.isfinite(shap))
- self.assertTrue(np.isfinite(next_pot))
- self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING)
-
class TestCsvAndSimulationOptions(RewardSpaceTestBase):
"""CLI-level tests: CSV encoding and simulate_unrealized_pnl option effects."""
allowed = {0, 1, 2, 3, 4}
self.assertTrue(set((int(v) for v in values)).issubset(allowed))
- def test_unrealized_pnl_affects_hold_potential(self):
- """--unrealized_pnl should alter hold next_potential distribution vs default."""
- out_default = self.output_path / "sim_default"
- out_sim = self.output_path / "sim_unrealized"
- base_args = ["--num_samples", "800", "--seed", str(self.SEED), "--out_dir"]
- cmd_default = [sys.executable, "reward_space_analysis.py", *base_args, str(out_default)]
- res_def = subprocess.run(
- cmd_default, capture_output=True, text=True, cwd=Path(__file__).parent.parent
- )
- self.assertEqual(res_def.returncode, 0, f"CLI default run failed: {res_def.stderr}")
- cmd_sim = [
- sys.executable,
- "reward_space_analysis.py",
- *base_args,
- str(out_sim),
- "--unrealized_pnl",
- ]
- res_sim = subprocess.run(
- cmd_sim, capture_output=True, text=True, cwd=Path(__file__).parent.parent
- )
- self.assertEqual(res_sim.returncode, 0, f"CLI simulated run failed: {res_sim.stderr}")
- df_def = pd.read_csv(out_default / "reward_samples.csv")
- df_sim = pd.read_csv(out_sim / "reward_samples.csv")
- mask_hold_def = (df_def["action"] == 0) & df_def["position"].isin([0.0, 1.0])
- mask_hold_sim = (df_sim["action"] == 0) & df_sim["position"].isin([0.0, 1.0])
- self.assertGreater(int(mask_hold_def.sum()), 0, "No hold samples in default run")
- self.assertGreater(int(mask_hold_sim.sum()), 0, "No hold samples in simulate run")
- mean_next_def = float(df_def.loc[mask_hold_def, "next_potential"].mean())
- mean_next_sim = float(df_sim.loc[mask_hold_sim, "next_potential"].mean())
- self.assertFinite(mean_next_def, name="mean_next_def")
- self.assertFinite(mean_next_sim, name="mean_next_sim")
- self.assertGreater(
- abs(mean_next_sim - mean_next_def),
- self.TOL_GENERIC_EQ,
- f"No detectable effect of --unrealized_pnl on Φ(s): def={mean_next_def:.6f}, sim={mean_next_sim:.6f}",
- )
-
class TestParamsPropagation(RewardSpaceTestBase):
"""Integration tests to validate max_trade_duration_candles propagation via CLI params and dynamic flag."""
{ url = "https://files.pythonhosted.org/packages/ec/16/114df1c291c22cac3b0c127a73e0af5c12ed7bbb6558d310429a0ae24023/coverage-7.10.7-py3-none-any.whl", hash = "sha256:f7941f6f2fe6dd6807a1208737b8a0cbcf1cc6d7b07d24998ad2d63590868260", size = 209952, upload-time = "2025-09-21T20:03:53.918Z" },
]
+[package.optional-dependencies]
+toml = [
+ { name = "tomli", marker = "python_full_version < '3.10'" },
+]
+
[[package]]
name = "coverage"
version = "7.11.0"
{ url = "https://files.pythonhosted.org/packages/5f/04/642c1d8a448ae5ea1369eac8495740a79eb4e581a9fb0cbdce56bbf56da1/coverage-7.11.0-py3-none-any.whl", hash = "sha256:4b7589765348d78fb4e5fb6ea35d07564e387da2fc5efff62e0222971f155f68", size = 207761, upload-time = "2025-10-15T15:15:06.439Z" },
]
+[package.optional-dependencies]
+toml = [
+ { name = "tomli", marker = "python_full_version >= '3.10' and python_full_version <= '3.11'" },
+]
+
[[package]]
name = "exceptiongroup"
version = "1.3.0"
{ url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" },
]
+[[package]]
+name = "pytest-cov"
+version = "7.0.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "coverage", version = "7.10.7", source = { registry = "https://pypi.org/simple" }, extra = ["toml"], marker = "python_full_version < '3.10'" },
+ { name = "coverage", version = "7.11.0", source = { registry = "https://pypi.org/simple" }, extra = ["toml"], marker = "python_full_version >= '3.10'" },
+ { name = "pluggy" },
+ { name = "pytest" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/5e/f7/c933acc76f5208b3b00089573cf6a2bc26dc80a8aece8f52bb7d6b1855ca/pytest_cov-7.0.0.tar.gz", hash = "sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1", size = 54328, upload-time = "2025-09-09T10:57:02.113Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
+]
+
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
{ name = "coverage", version = "7.10.7", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },
{ name = "coverage", version = "7.11.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" },
{ name = "pytest" },
+ { name = "pytest-cov" },
{ name = "ruff" },
]
dev = [
{ name = "coverage" },
{ name = "pytest", specifier = ">=6.0" },
+ { name = "pytest-cov", specifier = ">=7.0.0" },
{ name = "ruff" },
]