From 8cd080b3dce7c710c48e09fce17472107f84f249 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 22 Oct 2025 23:37:32 +0200 Subject: [PATCH] test(reforcexy): tests cleanups MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../reward_space_analysis/pyproject.toml | 1 + .../reward_space_analysis/tests/__init__.py | 2 - .../tests/test_api_helpers.py | 155 +--------- .../reward_space_analysis/tests/test_pbrs.py | 244 +++++++++++++-- .../tests/test_reward_components.py | 219 +++++++++++++- .../tests/test_robustness.py | 168 ++++------ .../tests/test_statistics.py | 39 ++- .../tests/test_utilities.py | 286 +----------------- ReforceXY/reward_space_analysis/uv.lock | 27 ++ 9 files changed, 574 insertions(+), 567 deletions(-) diff --git a/ReforceXY/reward_space_analysis/pyproject.toml b/ReforceXY/reward_space_analysis/pyproject.toml index 831be3e..54a7839 100644 --- a/ReforceXY/reward_space_analysis/pyproject.toml +++ b/ReforceXY/reward_space_analysis/pyproject.toml @@ -20,6 +20,7 @@ dev = [ "pytest>=6.0", "ruff", "coverage", + "pytest-cov>=7.0.0", ] [build-system] diff --git a/ReforceXY/reward_space_analysis/tests/__init__.py b/ReforceXY/reward_space_analysis/tests/__init__.py index 64cf7d7..f8d632c 100644 --- a/ReforceXY/reward_space_analysis/tests/__init__.py +++ b/ReforceXY/reward_space_analysis/tests/__init__.py @@ -7,7 +7,6 @@ from .test_reward_components import TestRewardComponents from .test_robustness import TestRewardRobustnessAndBoundaries from .test_statistics import TestStatistics from .test_utilities import ( - TestBootstrapStatistics, TestCsvAndSimulationOptions, TestLoadRealEpisodes, TestParamsPropagation, @@ -23,7 +22,6 @@ __all__ = [ "TestPrivateFunctions", "TestRewardRobustnessAndBoundaries", "TestLoadRealEpisodes", - "TestBootstrapStatistics", "TestReportFormatting", "TestCsvAndSimulationOptions", "TestParamsPropagation", diff --git a/ReforceXY/reward_space_analysis/tests/test_api_helpers.py b/ReforceXY/reward_space_analysis/tests/test_api_helpers.py index fa4bac2..5ebb047 100644 --- a/ReforceXY/reward_space_analysis/tests/test_api_helpers.py +++ b/ReforceXY/reward_space_analysis/tests/test_api_helpers.py @@ -30,6 +30,7 @@ class TestAPIAndHelpers(RewardSpaceTestBase): """Public API + helper utility tests.""" def test_parse_overrides(self): + """Test parse overrides.""" overrides = ["alpha=1.5", "mode=linear", "limit=42"] result = parse_overrides(overrides) self.assertEqual(result["alpha"], 1.5) @@ -39,6 +40,7 @@ class TestAPIAndHelpers(RewardSpaceTestBase): parse_overrides(["badpair"]) def test_api_simulation_and_reward_smoke(self): + """Test api simulation and reward smoke.""" df = simulate_samples( params=self.base_params(max_trade_duration_candles=40), num_samples=20, @@ -250,68 +252,6 @@ class TestAPIAndHelpers(RewardSpaceTestBase): class TestPrivateFunctions(RewardSpaceTestBase): """Test private functions through public API calls.""" - def test_idle_penalty_via_rewards(self): - """Test idle penalty calculation via reward calculation.""" - context = self.make_ctx( - pnl=0.0, - trade_duration=0, - idle_duration=20, - max_unrealized_profit=0.0, - min_unrealized_profit=0.0, - position=Positions.Neutral, - action=Actions.Neutral, - ) - breakdown = calculate_reward( - context, - self.DEFAULT_PARAMS, - base_factor=self.TEST_BASE_FACTOR, - profit_target=self.TEST_PROFIT_TARGET, - risk_reward_ratio=1.0, - short_allowed=True, - action_masking=True, - ) - self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative") - self.assertAlmostEqualFloat( - breakdown.total, - breakdown.idle_penalty - + breakdown.reward_shaping - + breakdown.entry_additive - + breakdown.exit_additive, - tolerance=self.TOL_IDENTITY_RELAXED, - msg="Total should equal sum of components (idle + shaping/additives)", - ) - - def test_hold_penalty_via_rewards(self): - """Test hold penalty calculation via reward calculation.""" - context = self.make_ctx( - pnl=0.01, - trade_duration=150, - idle_duration=0, - max_unrealized_profit=0.02, - min_unrealized_profit=0.0, - position=Positions.Long, - action=Actions.Neutral, - ) - breakdown = calculate_reward( - context, - self.DEFAULT_PARAMS, - base_factor=self.TEST_BASE_FACTOR, - profit_target=self.TEST_PROFIT_TARGET, - risk_reward_ratio=self.TEST_RR, - short_allowed=True, - action_masking=True, - ) - self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative") - self.assertAlmostEqualFloat( - breakdown.total, - breakdown.hold_penalty - + breakdown.reward_shaping - + breakdown.entry_additive - + breakdown.exit_additive, - tolerance=self.TOL_IDENTITY_RELAXED, - msg="Total should equal sum of components (hold + shaping/additives)", - ) - def test_exit_reward_calculation(self): """Test exit reward calculation with various scenarios.""" scenarios = [ @@ -380,97 +320,6 @@ class TestPrivateFunctions(RewardSpaceTestBase): msg="Total should equal invalid penalty plus shaping/additives", ) - def test_hold_penalty_zero_before_max_duration(self): - """Test hold penalty logic: zero penalty before max_trade_duration.""" - max_duration = 128 - test_cases = [ - (64, "before max_duration"), - (127, "just before max_duration"), - (128, "exactly at max_duration"), - (129, "just after max_duration"), - (192, "well after max_duration"), - ] - for trade_duration, description in test_cases: - with self.subTest(duration=trade_duration, desc=description): - context = self.make_ctx( - pnl=0.0, - trade_duration=trade_duration, - idle_duration=0, - max_unrealized_profit=0.0, - min_unrealized_profit=0.0, - position=Positions.Long, - action=Actions.Neutral, - ) - breakdown = calculate_reward( - context, - self.DEFAULT_PARAMS, - base_factor=self.TEST_BASE_FACTOR, - profit_target=self.TEST_PROFIT_TARGET, - risk_reward_ratio=1.0, - short_allowed=True, - action_masking=True, - ) - duration_ratio = trade_duration / max_duration - if duration_ratio < 1.0: - self.assertEqual( - breakdown.hold_penalty, - 0.0, - f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", - ) - elif duration_ratio == 1.0: - self.assertEqual( - breakdown.hold_penalty, - 0.0, - f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", - ) - else: - self.assertLess( - breakdown.hold_penalty, - 0.0, - f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})", - ) - self.assertAlmostEqualFloat( - breakdown.total, - breakdown.hold_penalty - + breakdown.reward_shaping - + breakdown.entry_additive - + breakdown.exit_additive, - tolerance=self.TOL_IDENTITY_RELAXED, - msg=f"Total mismatch including shaping {description}", - ) - - def test_hold_penalty_progressive_scaling(self): - """Test that hold penalty scales progressively after max_duration.""" - params = self.base_params(max_trade_duration_candles=100) - durations = [150, 200, 300] - penalties: list[float] = [] - for duration in durations: - context = self.make_ctx( - pnl=0.0, - trade_duration=duration, - idle_duration=0, - max_unrealized_profit=0.0, - min_unrealized_profit=0.0, - position=Positions.Long, - action=Actions.Neutral, - ) - breakdown = calculate_reward( - context, - params, - base_factor=self.TEST_BASE_FACTOR, - profit_target=self.TEST_PROFIT_TARGET, - risk_reward_ratio=self.TEST_RR, - short_allowed=True, - action_masking=True, - ) - penalties.append(breakdown.hold_penalty) - for i in range(1, len(penalties)): - self.assertLessEqual( - penalties[i], - penalties[i - 1], - f"Penalty should increase with duration: {penalties[i]} > {penalties[i - 1]}", - ) - def test_new_invariant_and_warn_parameters(self): """Ensure new tunables (check_invariants, exit_factor_threshold) exist and behave. diff --git a/ReforceXY/reward_space_analysis/tests/test_pbrs.py b/ReforceXY/reward_space_analysis/tests/test_pbrs.py index fe4e80a..73b9d4f 100644 --- a/ReforceXY/reward_space_analysis/tests/test_pbrs.py +++ b/ReforceXY/reward_space_analysis/tests/test_pbrs.py @@ -8,6 +8,7 @@ import numpy as np from reward_space_analysis import ( DEFAULT_MODEL_REWARD_PARAMETERS, + PBRS_INVARIANCE_TOL, _compute_entry_additive, _compute_exit_additive, _compute_exit_potential, @@ -15,6 +16,7 @@ from reward_space_analysis import ( _get_float_param, apply_potential_shaping, apply_transform, + simulate_samples, validate_reward_parameters, ) @@ -109,6 +111,65 @@ class TestPBRS(RewardSpaceTestBase): self.assertTrue(abs(apply_transform("softsign", 100.0)) < 1.0) self.assertTrue(abs(apply_transform("softsign", -100.0)) < 1.0) + def test_canonical_invariance_flag_and_sum(self): + """Canonical mode + no additives -> pbrs_invariant True and Σ shaping ≈ 0.""" + params = self.base_params( + exit_potential_mode="canonical", + entry_additive_enabled=False, + exit_additive_enabled=False, + hold_potential_enabled=True, + ) + df = simulate_samples( + params={**params, "max_trade_duration_candles": 100}, + num_samples=400, + seed=self.SEED, + base_factor=self.TEST_BASE_FACTOR, + profit_target=self.TEST_PROFIT_TARGET, + risk_reward_ratio=self.TEST_RR, + max_duration_ratio=2.0, + trading_mode="margin", + pnl_base_std=self.TEST_PNL_STD, + pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE, + ) + unique_flags = set(df["pbrs_invariant"].unique().tolist()) + self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}") + total_shaping = float(df["reward_shaping"].sum()) + self.assertLess( + abs(total_shaping), + PBRS_INVARIANCE_TOL, + f"Canonical invariance violated: Σ shaping = {total_shaping}", + ) + + def test_non_canonical_flag_false_and_sum_nonzero(self): + """Non-canonical exit potential (progressive_release) -> pbrs_invariant False and Σ shaping != 0.""" + params = self.base_params( + exit_potential_mode="progressive_release", + exit_potential_decay=0.25, + entry_additive_enabled=False, + exit_additive_enabled=False, + hold_potential_enabled=True, + ) + df = simulate_samples( + params={**params, "max_trade_duration_candles": 100}, + num_samples=400, + seed=self.SEED, + base_factor=self.TEST_BASE_FACTOR, + profit_target=self.TEST_PROFIT_TARGET, + risk_reward_ratio=self.TEST_RR, + max_duration_ratio=2.0, + trading_mode="margin", + pnl_base_std=self.TEST_PNL_STD, + pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE, + ) + unique_flags = set(df["pbrs_invariant"].unique().tolist()) + self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}") + total_shaping = float(df["reward_shaping"].sum()) + self.assertGreater( + abs(total_shaping), + PBRS_INVARIANCE_TOL * 10, + f"Expected non-zero Σ shaping in non-canonical mode (got {total_shaping})", + ) + def test_asinh_transform(self): """asinh transform: x / sqrt(1 + x^2) in (-1, 1).""" self.assertAlmostEqualFloat(apply_transform("asinh", 0.0), 0.0) @@ -154,31 +215,20 @@ class TestPBRS(RewardSpaceTestBase): tolerance=self.TOL_IDENTITY_RELAXED, ) - def test_hold_potential_basic(self): - """Test basic hold potential calculation.""" - params = { - "hold_potential_enabled": True, - "hold_potential_scale": 1.0, - "hold_potential_gain": 1.0, - "hold_potential_transform_pnl": "tanh", - "hold_potential_transform_duration": "tanh", - } - val = _compute_hold_potential(0.5, 0.3, params) - self.assertFinite(val, name="hold_potential") - - def test_entry_additive_disabled(self): - """Test entry additive when disabled.""" - params = {"entry_additive_enabled": False} - val = _compute_entry_additive(0.5, 0.3, params) - self.assertEqual(val, 0.0) - - def test_exit_additive_disabled(self): - """Test exit additive when disabled.""" - params = {"exit_additive_enabled": False} - val = _compute_exit_additive(0.5, 0.3, params) - self.assertEqual(val, 0.0) + def test_additive_components_disabled_return_zero(self): + """Test entry and exit additives return zero when disabled.""" + # Test entry additive disabled + params_entry = {"entry_additive_enabled": False} + val_entry = _compute_entry_additive(0.5, 0.3, params_entry) + self.assertEqual(val_entry, 0.0) + + # Test exit additive disabled + params_exit = {"exit_additive_enabled": False} + val_exit = _compute_exit_additive(0.5, 0.3, params_exit) + self.assertEqual(val_exit, 0.0) def test_exit_potential_canonical(self): + """Test exit potential canonical.""" params = self.base_params( exit_potential_mode="canonical", hold_potential_enabled=True, @@ -447,6 +497,154 @@ class TestPBRS(RewardSpaceTestBase): self.assertAlmostEqualFloat(s_base, s_scaled, tolerance=self.TOL_DISTRIB_SHAPE) self.assertAlmostEqualFloat(k_base, k_scaled, tolerance=self.TOL_DISTRIB_SHAPE) + def test_pbrs_non_canonical_report_generation(self): + """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification.""" + import re + + import pandas as pd + + from reward_space_analysis import PBRS_INVARIANCE_TOL + + df = pd.DataFrame( + { + "reward_shaping": [0.01, -0.002], + "reward_entry_additive": [0.0, 0.0], + "reward_exit_additive": [0.001, 0.0], + } + ) + total_shaping = df["reward_shaping"].sum() + self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL) + invariance_status = "❌ Non-canonical" + section = [] + section.append("**PBRS Invariance Summary:**\n") + section.append("| Field | Value |\n") + section.append("|-------|-------|\n") + section.append(f"| Invariance | {invariance_status} |\n") + section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n") + section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n") + section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n") + section.append(f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n") + section.append(f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n") + content = "".join(section) + self.assertIn("❌ Non-canonical", content) + self.assertRegex(content, "Σ Shaping Reward \\| 0\\.008000 \\|") + m_abs = re.search("Abs Σ Shaping Reward \\| ([0-9.]+e[+-][0-9]{2}) \\|", content) + self.assertIsNotNone(m_abs) + if m_abs: + val = float(m_abs.group(1)) + self.assertAlmostEqual(abs(total_shaping), val, places=12) + + def test_potential_gamma_boundary_values_stability(self): + """Test potential gamma boundary values (0 and ≈1) produce bounded shaping.""" + for gamma in [0.0, 0.999999]: + params = self.base_params( + hold_potential_enabled=True, + entry_additive_enabled=False, + exit_additive_enabled=False, + exit_potential_mode="canonical", + potential_gamma=gamma, + ) + _tot, shap, next_pot = apply_potential_shaping( + base_reward=0.0, + current_pnl=0.02, + current_duration_ratio=0.3, + next_pnl=0.025, + next_duration_ratio=0.35, + is_exit=False, + last_potential=0.0, + params=params, + ) + self.assertTrue(np.isfinite(shap)) + self.assertTrue(np.isfinite(next_pot)) + self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING) + + def test_report_cumulative_invariance_aggregation(self): + """Canonical telescoping term: small per-step mean drift, bounded increments.""" + params = self.base_params( + hold_potential_enabled=True, + entry_additive_enabled=False, + exit_additive_enabled=False, + exit_potential_mode="canonical", + ) + gamma = _get_float_param( + params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95) + ) + rng = np.random.default_rng(321) + last_potential = 0.0 + telescoping_sum = 0.0 + max_abs_step = 0.0 + steps = 0 + for _ in range(500): + is_exit = rng.uniform() < 0.1 + current_pnl = float(rng.normal(0, 0.05)) + current_dur = float(rng.uniform(0, 1)) + next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.05)) + next_dur = 0.0 if is_exit else float(rng.uniform(0, 1)) + _tot, _shap, next_potential = apply_potential_shaping( + base_reward=0.0, + current_pnl=current_pnl, + current_duration_ratio=current_dur, + next_pnl=next_pnl, + next_duration_ratio=next_dur, + is_exit=is_exit, + last_potential=last_potential, + params=params, + ) + inc = gamma * next_potential - last_potential + telescoping_sum += inc + if abs(inc) > max_abs_step: + max_abs_step = abs(inc) + steps += 1 + if is_exit: + last_potential = 0.0 + else: + last_potential = next_potential + mean_drift = telescoping_sum / max(1, steps) + self.assertLess( + abs(mean_drift), + 0.02, + f"Per-step telescoping drift too large (mean={mean_drift}, steps={steps})", + ) + self.assertLessEqual( + max_abs_step, + self.PBRS_MAX_ABS_SHAPING, + f"Unexpected large telescoping increment (max={max_abs_step})", + ) + + def test_report_explicit_non_invariance_progressive_release(self): + """progressive_release should generally yield non-zero cumulative shaping (release leak).""" + params = self.base_params( + hold_potential_enabled=True, + entry_additive_enabled=False, + exit_additive_enabled=False, + exit_potential_mode="progressive_release", + exit_potential_decay=0.25, + ) + rng = np.random.default_rng(321) + last_potential = 0.0 + shaping_sum = 0.0 + for _ in range(160): + is_exit = rng.uniform() < 0.15 + next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.07)) + next_dur = 0.0 if is_exit else float(rng.uniform(0, 1)) + _tot, shap, next_pot = apply_potential_shaping( + base_reward=0.0, + current_pnl=float(rng.normal(0, 0.07)), + current_duration_ratio=float(rng.uniform(0, 1)), + next_pnl=next_pnl, + next_duration_ratio=next_dur, + is_exit=is_exit, + last_potential=last_potential, + params=params, + ) + shaping_sum += shap + last_potential = 0.0 if is_exit else next_pot + self.assertGreater( + abs(shaping_sum), + PBRS_INVARIANCE_TOL * 50, + f"Expected non-zero Σ shaping (got {shaping_sum})", + ) + if __name__ == "__main__": unittest.main() diff --git a/ReforceXY/reward_space_analysis/tests/test_reward_components.py b/ReforceXY/reward_space_analysis/tests/test_reward_components.py index 074b8aa..a9c3864 100644 --- a/ReforceXY/reward_space_analysis/tests/test_reward_components.py +++ b/ReforceXY/reward_space_analysis/tests/test_reward_components.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """Tests for reward calculation components and algorithms.""" +import dataclasses import math import unittest @@ -8,7 +9,9 @@ from reward_space_analysis import ( Actions, Positions, RewardContext, + _compute_hold_potential, _get_exit_factor, + _get_float_param, _get_pnl_factor, calculate_reward, ) @@ -17,10 +20,162 @@ from .test_base import RewardSpaceTestBase class TestRewardComponents(RewardSpaceTestBase): + def test_hold_potential_computation_finite(self): + """Test hold potential computation returns finite values.""" + params = { + "hold_potential_enabled": True, + "hold_potential_scale": 1.0, + "hold_potential_gain": 1.0, + "hold_potential_transform_pnl": "tanh", + "hold_potential_transform_duration": "tanh", + } + val = _compute_hold_potential(0.5, 0.3, params) + self.assertFinite(val, name="hold_potential") + + def test_hold_penalty_comprehensive(self): + """Comprehensive hold penalty test: calculation, thresholds, and progressive scaling.""" + # Test 1: Basic hold penalty calculation via reward calculation (trade_duration > max_duration) + context = self.make_ctx( + pnl=0.01, + trade_duration=150, # > default max_duration (128) + idle_duration=0, + max_unrealized_profit=0.02, + min_unrealized_profit=0.0, + position=Positions.Long, + action=Actions.Neutral, + ) + breakdown = calculate_reward( + context, + self.DEFAULT_PARAMS, + base_factor=self.TEST_BASE_FACTOR, + profit_target=self.TEST_PROFIT_TARGET, + risk_reward_ratio=self.TEST_RR, + short_allowed=True, + action_masking=True, + ) + self.assertLess(breakdown.hold_penalty, 0, "Hold penalty should be negative") + self.assertAlmostEqualFloat( + breakdown.total, + breakdown.hold_penalty + + breakdown.reward_shaping + + breakdown.entry_additive + + breakdown.exit_additive, + tolerance=self.TOL_IDENTITY_RELAXED, + msg="Total should equal sum of components (hold + shaping/additives)", + ) + + # Test 2: Zero penalty before max_duration threshold + max_duration = 128 + test_cases = [ + (64, "before max_duration"), + (127, "just before max_duration"), + (128, "exactly at max_duration"), + (129, "just after max_duration"), + ] + for trade_duration, description in test_cases: + with self.subTest(duration=trade_duration, desc=description): + context = self.make_ctx( + pnl=0.0, + trade_duration=trade_duration, + idle_duration=0, + position=Positions.Long, + action=Actions.Neutral, + ) + breakdown = calculate_reward( + context, + self.DEFAULT_PARAMS, + base_factor=self.TEST_BASE_FACTOR, + profit_target=self.TEST_PROFIT_TARGET, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + duration_ratio = trade_duration / max_duration + if duration_ratio < 1.0: + self.assertEqual( + breakdown.hold_penalty, + 0.0, + f"Hold penalty should be 0.0 {description} (ratio={duration_ratio:.2f})", + ) + elif duration_ratio == 1.0: + # At exact max duration, penalty can be 0.0 or slightly negative (implementation dependent) + self.assertLessEqual( + breakdown.hold_penalty, + 0.0, + f"Hold penalty should be <= 0.0 {description} (ratio={duration_ratio:.2f})", + ) + else: + # Beyond max duration, penalty should be strictly negative + self.assertLess( + breakdown.hold_penalty, + 0.0, + f"Hold penalty should be negative {description} (ratio={duration_ratio:.2f})", + ) + + # Test 3: Progressive scaling after max_duration + params = self.base_params(max_trade_duration_candles=100) + durations = [150, 200, 300] + penalties: list[float] = [] + for duration in durations: + context = self.make_ctx( + pnl=0.0, + trade_duration=duration, + idle_duration=0, + position=Positions.Long, + action=Actions.Neutral, + ) + breakdown = calculate_reward( + context, + params, + base_factor=self.TEST_BASE_FACTOR, + profit_target=self.TEST_PROFIT_TARGET, + risk_reward_ratio=self.TEST_RR, + short_allowed=True, + action_masking=True, + ) + penalties.append(breakdown.hold_penalty) + for i in range(1, len(penalties)): + self.assertLessEqual( + penalties[i], + penalties[i - 1], + f"Penalty should increase (more negative) with duration: {penalties[i]} <= {penalties[i - 1]}", + ) + + def test_idle_penalty_via_rewards(self): + """Test idle penalty calculation via reward calculation.""" + context = self.make_ctx( + pnl=0.0, + trade_duration=0, + idle_duration=20, + max_unrealized_profit=0.0, + min_unrealized_profit=0.0, + position=Positions.Neutral, + action=Actions.Neutral, + ) + breakdown = calculate_reward( + context, + self.DEFAULT_PARAMS, + base_factor=self.TEST_BASE_FACTOR, + profit_target=self.TEST_PROFIT_TARGET, + risk_reward_ratio=1.0, + short_allowed=True, + action_masking=True, + ) + self.assertLess(breakdown.idle_penalty, 0, "Idle penalty should be negative") + self.assertAlmostEqualFloat( + breakdown.total, + breakdown.idle_penalty + + breakdown.reward_shaping + + breakdown.entry_additive + + breakdown.exit_additive, + tolerance=self.TOL_IDENTITY_RELAXED, + msg="Total should equal sum of components (idle + shaping/additives)", + ) + """Core reward component tests.""" - def test_reward_calculation_scenarios_basic(self): - """Reward calculation scenarios: expected components become non-zero.""" + def test_reward_calculation_component_activation(self): + """Test reward component activation: idle_penalty and exit_component trigger correctly.""" test_cases = [ (Positions.Neutral, Actions.Neutral, "idle_penalty"), (Positions.Long, Actions.Long_exit, "exit_component"), @@ -53,6 +208,7 @@ class TestRewardComponents(RewardSpaceTestBase): self.assertFinite(breakdown.total, name="breakdown.total") def test_efficiency_zero_policy(self): + """Test efficiency zero policy.""" ctx = self.make_ctx( pnl=0.0, trade_duration=1, @@ -68,6 +224,7 @@ class TestRewardComponents(RewardSpaceTestBase): self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=self.TOL_GENERIC_EQ) def test_max_idle_duration_candles_logic(self): + """Test max idle duration candles logic.""" params_small = self.base_params(max_idle_duration_candles=50) params_large = self.base_params(max_idle_duration_candles=200) base_factor = self.TEST_BASE_FACTOR @@ -385,6 +542,64 @@ class TestRewardComponents(RewardSpaceTestBase): f"Long/Short asymmetry pnl={pnl}: long={br_long.exit_component}, short={br_short.exit_component}", ) + def test_idle_penalty_fallback_and_proportionality(self): + """Idle penalty fallback denominator & proportional scaling.""" + params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100) + base_factor = 90.0 + profit_target = self.TEST_PROFIT_TARGET + risk_reward_ratio = 1.0 + ctx_a = self.make_ctx( + pnl=0.0, + trade_duration=0, + idle_duration=20, + position=Positions.Neutral, + action=Actions.Neutral, + ) + ctx_b = dataclasses.replace(ctx_a, idle_duration=40) + br_a = calculate_reward( + ctx_a, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=risk_reward_ratio, + short_allowed=True, + action_masking=True, + ) + br_b = calculate_reward( + ctx_b, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=risk_reward_ratio, + short_allowed=True, + action_masking=True, + ) + self.assertLess(br_a.idle_penalty, 0.0) + self.assertLess(br_b.idle_penalty, 0.0) + ratio = br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None + self.assertIsNotNone(ratio) + if ratio is not None: + self.assertAlmostEqualFloat(abs(ratio), 2.0, tolerance=0.2) + ctx_mid = dataclasses.replace(ctx_a, idle_duration=120) + br_mid = calculate_reward( + ctx_mid, + params, + base_factor=base_factor, + profit_target=profit_target, + risk_reward_ratio=risk_reward_ratio, + short_allowed=True, + action_masking=True, + ) + self.assertLess(br_mid.idle_penalty, 0.0) + idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5) + idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025) + factor = _get_float_param(params, "base_factor", float(base_factor)) + idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0 + observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale) + if observed_ratio > 0: + implied_D = 120 / observed_ratio ** (1 / idle_penalty_power) + self.assertAlmostEqualFloat(implied_D, 400.0, tolerance=20.0) + if __name__ == "__main__": unittest.main() diff --git a/ReforceXY/reward_space_analysis/tests/test_robustness.py b/ReforceXY/reward_space_analysis/tests/test_robustness.py index d402f3d..9ed3988 100644 --- a/ReforceXY/reward_space_analysis/tests/test_robustness.py +++ b/ReforceXY/reward_space_analysis/tests/test_robustness.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """Robustness tests and boundary condition validation.""" -import dataclasses import math import unittest import warnings @@ -15,7 +14,6 @@ from reward_space_analysis import ( Positions, RewardContext, _get_exit_factor, - _get_float_param, _get_pnl_factor, calculate_reward, simulate_samples, @@ -161,8 +159,9 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): invalid_combinations = df[(df["pnl"].abs() <= self.EPS_BASE) & (df["reward_exit"] != 0)] self.assertEqual(len(invalid_combinations), 0) - def test_exit_factor_mathematical_formulas(self): - """Mathematical correctness of exit factor calculations across modes.""" + def test_exit_factor_comprehensive(self): + """Comprehensive exit factor test: mathematical correctness and monotonic attenuation.""" + # Part 1: Mathematical formulas validation context = self.make_ctx( pnl=0.05, trade_duration=50, @@ -174,6 +173,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): ) params = self.DEFAULT_PARAMS.copy() duration_ratio = 50 / 100 + + # Test power mode params["exit_attenuation_mode"] = "power" params["exit_power_tau"] = 0.5 params["exit_plateau"] = False @@ -187,6 +188,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): action_masking=True, ) self.assertGreater(reward_power.exit_component, 0) + + # Test half_life mode with mathematical validation params["exit_attenuation_mode"] = "half_life" params["exit_half_life"] = 0.5 reward_half_life = calculate_reward( @@ -212,6 +215,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): tolerance=self.TOL_IDENTITY_RELAXED, msg="Half-life attenuation mismatch: observed vs expected", ) + + # Test linear mode params["exit_attenuation_mode"] = "linear" params["exit_linear_slope"] = 1.0 reward_linear = calculate_reward( @@ -232,62 +237,57 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): unique_rewards = set((f"{r:.6f}" for r in rewards)) self.assertGreater(len(unique_rewards), 1) - def test_idle_penalty_fallback_and_proportionality(self): - """Idle penalty fallback denominator & proportional scaling (robustness).""" - params = self.base_params(max_idle_duration_candles=None, max_trade_duration_candles=100) - base_factor = 90.0 - profit_target = self.TEST_PROFIT_TARGET - risk_reward_ratio = 1.0 - ctx_a = self.make_ctx( - pnl=0.0, - trade_duration=0, - idle_duration=20, - position=Positions.Neutral, - action=Actions.Neutral, - ) - ctx_b = dataclasses.replace(ctx_a, idle_duration=40) - br_a = calculate_reward( - ctx_a, - params, - base_factor=base_factor, - profit_target=profit_target, - risk_reward_ratio=risk_reward_ratio, - short_allowed=True, - action_masking=True, - ) - br_b = calculate_reward( - ctx_b, - params, - base_factor=base_factor, - profit_target=profit_target, - risk_reward_ratio=risk_reward_ratio, - short_allowed=True, - action_masking=True, - ) - self.assertLess(br_a.idle_penalty, 0.0) - self.assertLess(br_b.idle_penalty, 0.0) - ratio = br_b.idle_penalty / br_a.idle_penalty if br_a.idle_penalty != 0 else None - self.assertIsNotNone(ratio) - self.assertAlmostEqualFloat(abs(ratio), 2.0, tolerance=0.2) - ctx_mid = dataclasses.replace(ctx_a, idle_duration=120) - br_mid = calculate_reward( - ctx_mid, - params, - base_factor=base_factor, - profit_target=profit_target, - risk_reward_ratio=risk_reward_ratio, - short_allowed=True, - action_masking=True, - ) - self.assertLess(br_mid.idle_penalty, 0.0) - idle_penalty_scale = _get_float_param(params, "idle_penalty_scale", 0.5) - idle_penalty_power = _get_float_param(params, "idle_penalty_power", 1.025) - factor = _get_float_param(params, "base_factor", float(base_factor)) - idle_factor = factor * (profit_target * risk_reward_ratio) / 4.0 - observed_ratio = abs(br_mid.idle_penalty) / (idle_factor * idle_penalty_scale) - if observed_ratio > 0: - implied_D = 120 / observed_ratio ** (1 / idle_penalty_power) - self.assertAlmostEqualFloat(implied_D, 400.0, tolerance=20.0) + # Part 2: Monotonic attenuation validation + modes = list(ATTENUATION_MODES) + ["plateau_linear"] + base_factor = self.TEST_BASE_FACTOR + pnl = 0.05 + pnl_factor = 1.0 + for mode in modes: + with self.subTest(mode=mode): + if mode == "plateau_linear": + mode_params = self.base_params( + exit_attenuation_mode="linear", + exit_plateau=True, + exit_plateau_grace=0.2, + exit_linear_slope=1.0, + ) + elif mode == "linear": + mode_params = self.base_params( + exit_attenuation_mode="linear", exit_linear_slope=1.2 + ) + elif mode == "power": + mode_params = self.base_params( + exit_attenuation_mode="power", exit_power_tau=0.5 + ) + elif mode == "half_life": + mode_params = self.base_params( + exit_attenuation_mode="half_life", exit_half_life=0.7 + ) + else: + mode_params = self.base_params(exit_attenuation_mode="sqrt") + + ratios = np.linspace(0, 2, 15) + values = [ + _get_exit_factor(base_factor, pnl, pnl_factor, r, mode_params) for r in ratios + ] + + if mode == "plateau_linear": + grace = float(mode_params["exit_plateau_grace"]) + filtered = [ + (r, v) + for r, v in zip(ratios, values) + if r >= grace - self.TOL_IDENTITY_RELAXED + ] + values_to_check = [v for _, v in filtered] + else: + values_to_check = values + + for earlier, later in zip(values_to_check, values_to_check[1:]): + self.assertLessEqual( + later, + earlier + self.TOL_IDENTITY_RELAXED, + f"Non-monotonic attenuation in mode={mode}", + ) def test_exit_factor_threshold_warning_and_non_capping(self): """Warning emission without capping when exit_factor_threshold exceeded.""" @@ -388,7 +388,8 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): f"Alpha attenuation mismatch tau={tau} alpha={alpha} obs_ratio={observed_ratio} exp_ratio={expected_ratio}", ) - def test_extreme_parameter_values(self): + def test_reward_calculation_extreme_parameters_stability(self): + """Test reward calculation extreme parameters stability.""" extreme_params = self.base_params(win_reward_factor=1000.0, base_factor=10000.0) context = RewardContext( pnl=0.05, @@ -411,6 +412,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): self.assertFinite(br.total, name="breakdown.total") def test_exit_attenuation_modes_enumeration(self): + """Test exit attenuation modes enumeration.""" modes = ATTENUATION_MODES_WITH_LEGACY for mode in modes: with self.subTest(mode=mode): @@ -436,49 +438,6 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): self.assertFinite(br.exit_component, name="breakdown.exit_component") self.assertFinite(br.total, name="breakdown.total") - def test_exit_factor_monotonic_attenuation(self): - """For attenuation modes: factor should be non-increasing w.r.t duration_ratio. - - Modes covered: sqrt, linear, power, half_life, plateau+linear (after grace). - Legacy is excluded (non-monotonic by design). Plateau+linear includes flat grace then monotonic. - """ - modes = list(ATTENUATION_MODES) + ["plateau_linear"] - base_factor = self.TEST_BASE_FACTOR - pnl = 0.05 - pnl_factor = 1.0 - for mode in modes: - if mode == "plateau_linear": - params = self.base_params( - exit_attenuation_mode="linear", - exit_plateau=True, - exit_plateau_grace=0.2, - exit_linear_slope=1.0, - ) - elif mode == "linear": - params = self.base_params(exit_attenuation_mode="linear", exit_linear_slope=1.2) - elif mode == "power": - params = self.base_params(exit_attenuation_mode="power", exit_power_tau=0.5) - elif mode == "half_life": - params = self.base_params(exit_attenuation_mode="half_life", exit_half_life=0.7) - else: - params = self.base_params(exit_attenuation_mode="sqrt") - ratios = np.linspace(0, 2, 15) - values = [_get_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios] - if mode == "plateau_linear": - grace = float(params["exit_plateau_grace"]) - filtered = [ - (r, v) for r, v in zip(ratios, values) if r >= grace - self.TOL_IDENTITY_RELAXED - ] - values_to_check = [v for _, v in filtered] - else: - values_to_check = values - for earlier, later in zip(values_to_check, values_to_check[1:]): - self.assertLessEqual( - later, - earlier + self.TOL_IDENTITY_RELAXED, - f"Non-monotonic attenuation in mode={mode}", - ) - def test_exit_factor_boundary_parameters(self): """Test parameter edge cases: tau extremes, plateau grace edges, slope zero.""" base_factor = 50.0 @@ -572,6 +531,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): self.assertLess(vals[-1], ref, "Attenuation should begin after grace boundary") def test_plateau_continuity_at_grace_boundary(self): + """Test plateau continuity at grace boundary.""" modes = ["sqrt", "linear", "power", "half_life"] grace = 0.8 eps = self.CONTINUITY_EPS_SMALL diff --git a/ReforceXY/reward_space_analysis/tests/test_statistics.py b/ReforceXY/reward_space_analysis/tests/test_statistics.py index 7695645..837235c 100644 --- a/ReforceXY/reward_space_analysis/tests/test_statistics.py +++ b/ReforceXY/reward_space_analysis/tests/test_statistics.py @@ -124,8 +124,8 @@ class TestStatistics(RewardSpaceTestBase): if key in diagnostics: self.assertFinite(diagnostics[key], name=key) - def test_statistical_functions(self): - """Smoke test statistical_hypothesis_tests on synthetic data (API integration).""" + def test_statistical_hypothesis_tests_api_integration(self): + """Test statistical_hypothesis_tests API integration with synthetic data.""" base = self.make_stats_df(n=200, seed=self.SEED, idle_pattern="mixed") base.loc[:149, ["reward_idle", "reward_hold", "reward_exit"]] = 0.0 results = statistical_hypothesis_tests(base) @@ -388,6 +388,41 @@ class TestStatistics(RewardSpaceTestBase): eff = res["effect_size_epsilon_sq"] self.assertFinite(eff) self.assertGreaterEqual(eff, 0) + + def test_bootstrap_confidence_intervals_bounds_ordering(self): + """Test bootstrap confidence intervals return ordered finite bounds.""" + test_data = self.make_stats_df(n=100, seed=self.SEED) + results = bootstrap_confidence_intervals(test_data, ["reward", "pnl"], n_bootstrap=100) + for metric, (mean, ci_low, ci_high) in results.items(): + self.assertFinite(mean, name=f"mean[{metric}]") + self.assertFinite(ci_low, name=f"ci_low[{metric}]") + self.assertFinite(ci_high, name=f"ci_high[{metric}]") + self.assertLess(ci_low, ci_high) + + def test_stats_bootstrap_shrinkage_with_sample_size(self): + """Bootstrap CI half-width decreases with larger sample (~1/sqrt(n) heuristic).""" + small = self._shift_scale_df(80) + large = self._shift_scale_df(800) + res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400) + res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400) + _, lo_s, hi_s = list(res_small.values())[0] + _, lo_l, hi_l = list(res_large.values())[0] + hw_small = (hi_s - lo_s) / 2.0 + hw_large = (hi_l - lo_l) / 2.0 + self.assertFinite(hw_small, name="hw_small") + self.assertFinite(hw_large, name="hw_large") + self.assertLess(hw_large, hw_small * 0.55) + + def test_stats_bootstrap_constant_distribution_and_diagnostics(self): + """Bootstrap on degenerate columns produce (mean≈lo≈hi) zero-width intervals.""" + df = self._const_df(80) + res = bootstrap_confidence_intervals( + df, ["reward", "pnl"], n_bootstrap=200, confidence_level=0.95 + ) + for _metric, (mean, lo, hi) in res.items(): + self.assertAlmostEqualFloat(mean, lo, tolerance=2e-09) + self.assertAlmostEqualFloat(mean, hi, tolerance=2e-09) + self.assertLessEqual(hi - lo, 2e-09) if "effect_size_rank_biserial" in res: rb = res["effect_size_rank_biserial"] self.assertFinite(rb) diff --git a/ReforceXY/reward_space_analysis/tests/test_utilities.py b/ReforceXY/reward_space_analysis/tests/test_utilities.py index 2d1931c..c214ace 100644 --- a/ReforceXY/reward_space_analysis/tests/test_utilities.py +++ b/ReforceXY/reward_space_analysis/tests/test_utilities.py @@ -10,17 +10,12 @@ import unittest import warnings from pathlib import Path -import numpy as np import pandas as pd from reward_space_analysis import ( - DEFAULT_MODEL_REWARD_PARAMETERS, PBRS_INVARIANCE_TOL, - _get_float_param, apply_potential_shaping, - bootstrap_confidence_intervals, load_real_episodes, - simulate_samples, ) from .test_base import RewardSpaceTestBase @@ -34,6 +29,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): pickle.dump(obj, f) def test_top_level_dict_transitions(self): + """Test top level dict transitions.""" df = pd.DataFrame( { "pnl": [0.01], @@ -52,6 +48,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): self.assertEqual(len(loaded), 1) def test_mixed_episode_list_warns_and_flattens(self): + """Test mixed episode list warns and flattens.""" ep1 = {"episode_id": 1} ep2 = { "episode_id": 2, @@ -76,6 +73,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): self.assertPlacesEqual(float(loaded.iloc[0]["pnl"]), 0.02, places=7) def test_non_iterable_transitions_raises(self): + """Test non iterable transitions raises.""" bad = {"transitions": 123} p = Path(self.temp_dir) / "bad.pkl" self.write_pickle(bad, p) @@ -83,6 +81,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): load_real_episodes(p) def test_enforce_columns_false_fills_na(self): + """Test enforce columns false fills na.""" trans = [ {"pnl": 0.03, "trade_duration": 10, "idle_duration": 0, "position": 1.0, "action": 2.0} ] @@ -93,6 +92,7 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): self.assertTrue(loaded["reward"].isna().all()) def test_casting_numeric_strings(self): + """Test casting numeric strings.""" trans = [ { "pnl": "0.04", @@ -130,104 +130,6 @@ class TestLoadRealEpisodes(RewardSpaceTestBase): self.assertIn("pnl", loaded_data.columns) -class TestBootstrapStatistics(RewardSpaceTestBase): - """Grouped tests for bootstrap confidence interval behavior.""" - - def test_constant_distribution_bootstrap_and_diagnostics(self): - """Degenerate columns produce (mean≈lo≈hi) zero-width intervals.""" - df = self._const_df(80) - res = bootstrap_confidence_intervals( - df, ["reward", "pnl"], n_bootstrap=200, confidence_level=0.95 - ) - for k, (mean, lo, hi) in res.items(): - self.assertAlmostEqualFloat(mean, lo, tolerance=2e-09) - self.assertAlmostEqualFloat(mean, hi, tolerance=2e-09) - self.assertLessEqual(hi - lo, 2e-09) - - def test_bootstrap_shrinkage_with_sample_size(self): - """Half-width decreases with larger sample (~1/sqrt(n) heuristic).""" - small = self._shift_scale_df(80) - large = self._shift_scale_df(800) - res_small = bootstrap_confidence_intervals(small, ["reward"], n_bootstrap=400) - res_large = bootstrap_confidence_intervals(large, ["reward"], n_bootstrap=400) - _, lo_s, hi_s = list(res_small.values())[0] - _, lo_l, hi_l = list(res_large.values())[0] - hw_small = (hi_s - lo_s) / 2.0 - hw_large = (hi_l - lo_l) / 2.0 - self.assertFinite(hw_small, name="hw_small") - self.assertFinite(hw_large, name="hw_large") - self.assertLess(hw_large, hw_small * 0.55) - - def test_bootstrap_confidence_intervals_basic(self): - """Basic CI computation returns ordered finite bounds.""" - test_data = self.make_stats_df(n=100, seed=self.SEED) - results = bootstrap_confidence_intervals(test_data, ["reward", "pnl"], n_bootstrap=100) - for metric, (mean, ci_low, ci_high) in results.items(): - self.assertFinite(mean, name=f"mean[{metric}]") - self.assertFinite(ci_low, name=f"ci_low[{metric}]") - self.assertFinite(ci_high, name=f"ci_high[{metric}]") - self.assertLess(ci_low, ci_high) - - def test_canonical_invariance_flag_and_sum(self): - """Canonical mode + no additives -> pbrs_invariant True and Σ shaping ≈ 0.""" - params = self.base_params( - exit_potential_mode="canonical", - entry_additive_enabled=False, - exit_additive_enabled=False, - hold_potential_enabled=True, - ) - df = simulate_samples( - params={**params, "max_trade_duration_candles": 100}, - num_samples=400, - seed=self.SEED, - base_factor=self.TEST_BASE_FACTOR, - profit_target=self.TEST_PROFIT_TARGET, - risk_reward_ratio=self.TEST_RR, - max_duration_ratio=2.0, - trading_mode="margin", - pnl_base_std=self.TEST_PNL_STD, - pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE, - ) - unique_flags = set(df["pbrs_invariant"].unique().tolist()) - self.assertEqual(unique_flags, {True}, f"Unexpected invariant flags: {unique_flags}") - total_shaping = float(df["reward_shaping"].sum()) - self.assertLess( - abs(total_shaping), - PBRS_INVARIANCE_TOL, - f"Canonical invariance violated: Σ shaping = {total_shaping}", - ) - - def test_non_canonical_flag_false_and_sum_nonzero(self): - """Non-canonical exit potential (progressive_release) -> pbrs_invariant False and Σ shaping != 0.""" - params = self.base_params( - exit_potential_mode="progressive_release", - exit_potential_decay=0.25, - entry_additive_enabled=False, - exit_additive_enabled=False, - hold_potential_enabled=True, - ) - df = simulate_samples( - params={**params, "max_trade_duration_candles": 100}, - num_samples=400, - seed=self.SEED, - base_factor=self.TEST_BASE_FACTOR, - profit_target=self.TEST_PROFIT_TARGET, - risk_reward_ratio=self.TEST_RR, - max_duration_ratio=2.0, - trading_mode="margin", - pnl_base_std=self.TEST_PNL_STD, - pnl_duration_vol_scale=self.TEST_PNL_DUR_VOL_SCALE, - ) - unique_flags = set(df["pbrs_invariant"].unique().tolist()) - self.assertEqual(unique_flags, {False}, f"Unexpected invariant flags: {unique_flags}") - total_shaping = float(df["reward_shaping"].sum()) - self.assertGreater( - abs(total_shaping), - PBRS_INVARIANCE_TOL * 10, - f"Expected non-zero Σ shaping in non-canonical mode (got {total_shaping})", - ) - - class TestReportFormatting(RewardSpaceTestBase): """Tests for report formatting elements not covered elsewhere.""" @@ -255,36 +157,6 @@ class TestReportFormatting(RewardSpaceTestBase): "Tolerance constant value should appear, not raw literal", ) - def test_pbrs_non_canonical_report_generation(self): - """Generate synthetic invariance section with non-zero shaping to assert Non-canonical classification.""" - df = pd.DataFrame( - { - "reward_shaping": [0.01, -0.002], - "reward_entry_additive": [0.0, 0.0], - "reward_exit_additive": [0.001, 0.0], - } - ) - total_shaping = df["reward_shaping"].sum() - self.assertGreater(abs(total_shaping), PBRS_INVARIANCE_TOL) - invariance_status = "❌ Non-canonical" - section = [] - section.append("**PBRS Invariance Summary:**\n") - section.append("| Field | Value |\n") - section.append("|-------|-------|\n") - section.append(f"| Invariance | {invariance_status} |\n") - section.append(f"| Note | Total shaping = {total_shaping:.6f} (non-zero) |\n") - section.append(f"| Σ Shaping Reward | {total_shaping:.6f} |\n") - section.append(f"| Abs Σ Shaping Reward | {abs(total_shaping):.6e} |\n") - section.append(f"| Σ Entry Additive | {df['reward_entry_additive'].sum():.6f} |\n") - section.append(f"| Σ Exit Additive | {df['reward_exit_additive'].sum():.6f} |\n") - content = "".join(section) - self.assertIn("❌ Non-canonical", content) - self.assertRegex(content, "Σ Shaping Reward \\| 0\\.008000 \\|") - m_abs = re.search("Abs Σ Shaping Reward \\| ([0-9.]+e[+-][0-9]{2}) \\|", content) - self.assertIsNotNone(m_abs) - if m_abs: - self.assertAlmostEqual(abs(total_shaping), float(m_abs.group(1)), places=12) - def test_additive_activation_deterministic_contribution(self): """Additives enabled increase total reward; shaping impact limited.""" base = self.base_params( @@ -320,117 +192,6 @@ class TestReportFormatting(RewardSpaceTestBase): self.assertLess(abs(s1 - s0), 0.2) self.assertGreater(t1 - _t0, 0.0, "Total reward should increase with additives present") - def test_report_cumulative_invariance_aggregation(self): - """Canonical telescoping term: small per-step mean drift, bounded increments.""" - params = self.base_params( - hold_potential_enabled=True, - entry_additive_enabled=False, - exit_additive_enabled=False, - exit_potential_mode="canonical", - ) - gamma = _get_float_param( - params, "potential_gamma", DEFAULT_MODEL_REWARD_PARAMETERS.get("potential_gamma", 0.95) - ) - rng = np.random.default_rng(321) - last_potential = 0.0 - telescoping_sum = 0.0 - max_abs_step = 0.0 - steps = 0 - for _ in range(500): - is_exit = rng.uniform() < 0.1 - current_pnl = float(rng.normal(0, 0.05)) - current_dur = float(rng.uniform(0, 1)) - next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.05)) - next_dur = 0.0 if is_exit else float(rng.uniform(0, 1)) - _tot, _shap, next_potential = apply_potential_shaping( - base_reward=0.0, - current_pnl=current_pnl, - current_duration_ratio=current_dur, - next_pnl=next_pnl, - next_duration_ratio=next_dur, - is_exit=is_exit, - last_potential=last_potential, - params=params, - ) - inc = gamma * next_potential - last_potential - telescoping_sum += inc - if abs(inc) > max_abs_step: - max_abs_step = abs(inc) - steps += 1 - if is_exit: - last_potential = 0.0 - else: - last_potential = next_potential - mean_drift = telescoping_sum / max(1, steps) - self.assertLess( - abs(mean_drift), - 0.02, - f"Per-step telescoping drift too large (mean={mean_drift}, steps={steps})", - ) - self.assertLessEqual( - max_abs_step, - self.PBRS_MAX_ABS_SHAPING, - f"Unexpected large telescoping increment (max={max_abs_step})", - ) - - def test_report_explicit_non_invariance_progressive_release(self): - """progressive_release should generally yield non-zero cumulative shaping (release leak).""" - params = self.base_params( - hold_potential_enabled=True, - entry_additive_enabled=False, - exit_additive_enabled=False, - exit_potential_mode="progressive_release", - exit_potential_decay=0.25, - ) - rng = np.random.default_rng(321) - last_potential = 0.0 - shaping_sum = 0.0 - for _ in range(160): - is_exit = rng.uniform() < 0.15 - next_pnl = 0.0 if is_exit else float(rng.normal(0, 0.07)) - next_dur = 0.0 if is_exit else float(rng.uniform(0, 1)) - _tot, shap, next_pot = apply_potential_shaping( - base_reward=0.0, - current_pnl=float(rng.normal(0, 0.07)), - current_duration_ratio=float(rng.uniform(0, 1)), - next_pnl=next_pnl, - next_duration_ratio=next_dur, - is_exit=is_exit, - last_potential=last_potential, - params=params, - ) - shaping_sum += shap - last_potential = 0.0 if is_exit else next_pot - self.assertGreater( - abs(shaping_sum), - PBRS_INVARIANCE_TOL * 50, - f"Expected non-zero Σ shaping (got {shaping_sum})", - ) - - def test_gamma_extremes(self): - """Gamma=0 and gamma≈1 boundary behaviours produce bounded shaping and finite potentials.""" - for gamma in [0.0, 0.999999]: - params = self.base_params( - hold_potential_enabled=True, - entry_additive_enabled=False, - exit_additive_enabled=False, - exit_potential_mode="canonical", - potential_gamma=gamma, - ) - _tot, shap, next_pot = apply_potential_shaping( - base_reward=0.0, - current_pnl=0.02, - current_duration_ratio=0.3, - next_pnl=0.025, - next_duration_ratio=0.35, - is_exit=False, - last_potential=0.0, - params=params, - ) - self.assertTrue(np.isfinite(shap)) - self.assertTrue(np.isfinite(next_pot)) - self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING) - class TestCsvAndSimulationOptions(RewardSpaceTestBase): """CLI-level tests: CSV encoding and simulate_unrealized_pnl option effects.""" @@ -464,43 +225,6 @@ class TestCsvAndSimulationOptions(RewardSpaceTestBase): allowed = {0, 1, 2, 3, 4} self.assertTrue(set((int(v) for v in values)).issubset(allowed)) - def test_unrealized_pnl_affects_hold_potential(self): - """--unrealized_pnl should alter hold next_potential distribution vs default.""" - out_default = self.output_path / "sim_default" - out_sim = self.output_path / "sim_unrealized" - base_args = ["--num_samples", "800", "--seed", str(self.SEED), "--out_dir"] - cmd_default = [sys.executable, "reward_space_analysis.py", *base_args, str(out_default)] - res_def = subprocess.run( - cmd_default, capture_output=True, text=True, cwd=Path(__file__).parent.parent - ) - self.assertEqual(res_def.returncode, 0, f"CLI default run failed: {res_def.stderr}") - cmd_sim = [ - sys.executable, - "reward_space_analysis.py", - *base_args, - str(out_sim), - "--unrealized_pnl", - ] - res_sim = subprocess.run( - cmd_sim, capture_output=True, text=True, cwd=Path(__file__).parent.parent - ) - self.assertEqual(res_sim.returncode, 0, f"CLI simulated run failed: {res_sim.stderr}") - df_def = pd.read_csv(out_default / "reward_samples.csv") - df_sim = pd.read_csv(out_sim / "reward_samples.csv") - mask_hold_def = (df_def["action"] == 0) & df_def["position"].isin([0.0, 1.0]) - mask_hold_sim = (df_sim["action"] == 0) & df_sim["position"].isin([0.0, 1.0]) - self.assertGreater(int(mask_hold_def.sum()), 0, "No hold samples in default run") - self.assertGreater(int(mask_hold_sim.sum()), 0, "No hold samples in simulate run") - mean_next_def = float(df_def.loc[mask_hold_def, "next_potential"].mean()) - mean_next_sim = float(df_sim.loc[mask_hold_sim, "next_potential"].mean()) - self.assertFinite(mean_next_def, name="mean_next_def") - self.assertFinite(mean_next_sim, name="mean_next_sim") - self.assertGreater( - abs(mean_next_sim - mean_next_def), - self.TOL_GENERIC_EQ, - f"No detectable effect of --unrealized_pnl on Φ(s): def={mean_next_def:.6f}, sim={mean_next_sim:.6f}", - ) - class TestParamsPropagation(RewardSpaceTestBase): """Integration tests to validate max_trade_duration_candles propagation via CLI params and dynamic flag.""" diff --git a/ReforceXY/reward_space_analysis/uv.lock b/ReforceXY/reward_space_analysis/uv.lock index cffb07d..6e97559 100644 --- a/ReforceXY/reward_space_analysis/uv.lock +++ b/ReforceXY/reward_space_analysis/uv.lock @@ -131,6 +131,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/16/114df1c291c22cac3b0c127a73e0af5c12ed7bbb6558d310429a0ae24023/coverage-7.10.7-py3-none-any.whl", hash = "sha256:f7941f6f2fe6dd6807a1208737b8a0cbcf1cc6d7b07d24998ad2d63590868260", size = 209952, upload-time = "2025-09-21T20:03:53.918Z" }, ] +[package.optional-dependencies] +toml = [ + { name = "tomli", marker = "python_full_version < '3.10'" }, +] + [[package]] name = "coverage" version = "7.11.0" @@ -235,6 +240,11 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5f/04/642c1d8a448ae5ea1369eac8495740a79eb4e581a9fb0cbdce56bbf56da1/coverage-7.11.0-py3-none-any.whl", hash = "sha256:4b7589765348d78fb4e5fb6ea35d07564e387da2fc5efff62e0222971f155f68", size = 207761, upload-time = "2025-10-15T15:15:06.439Z" }, ] +[package.optional-dependencies] +toml = [ + { name = "tomli", marker = "python_full_version >= '3.10' and python_full_version <= '3.11'" }, +] + [[package]] name = "exceptiongroup" version = "1.3.0" @@ -603,6 +613,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" }, ] +[[package]] +name = "pytest-cov" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "coverage", version = "7.10.7", source = { registry = "https://pypi.org/simple" }, extra = ["toml"], marker = "python_full_version < '3.10'" }, + { name = "coverage", version = "7.11.0", source = { registry = "https://pypi.org/simple" }, extra = ["toml"], marker = "python_full_version >= '3.10'" }, + { name = "pluggy" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/f7/c933acc76f5208b3b00089573cf6a2bc26dc80a8aece8f52bb7d6b1855ca/pytest_cov-7.0.0.tar.gz", hash = "sha256:33c97eda2e049a0c5298e91f519302a1334c26ac65c1a483d6206fd458361af1", size = 54328, upload-time = "2025-09-09T10:57:02.113Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -646,6 +671,7 @@ dev = [ { name = "coverage", version = "7.10.7", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "coverage", version = "7.11.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, { name = "pytest" }, + { name = "pytest-cov" }, { name = "ruff" }, ] @@ -662,6 +688,7 @@ requires-dist = [ dev = [ { name = "coverage" }, { name = "pytest", specifier = ">=6.0" }, + { name = "pytest-cov", specifier = ">=7.0.0" }, { name = "ruff" }, ] -- 2.43.0