From a9549ba4099138e7aad50ba44232a008a88e4629 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 15 Oct 2025 20:48:47 +0200 Subject: [PATCH] test(reforcexy): factor out params building MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .../test_reward_space_analysis.py | 316 ++++++++---------- 1 file changed, 137 insertions(+), 179 deletions(-) diff --git a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py index fa249e2..f7b12ea 100644 --- a/ReforceXY/reward_space_analysis/test_reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/test_reward_space_analysis.py @@ -73,16 +73,6 @@ except ImportError as e: sys.exit(1) -def base_params(**overrides) -> dict: - """Return a fresh copy of DEFAULT_MODEL_REWARD_PARAMETERS with overrides applied. - - Ensures tests do not mutate the global defaults inadvertently. - """ - params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() - params.update(overrides) - return params - - class RewardSpaceTestBase(unittest.TestCase): """Base class with common test utilities.""" @@ -101,10 +91,15 @@ class RewardSpaceTestBase(unittest.TestCase): def setUp(self): """Set up test fixtures with reproducible random seed.""" - np.random.seed(self.SEED) + # Unified seeding for numpy + random + self.seed_all(self.SEED) self.temp_dir = tempfile.mkdtemp() self.output_path = Path(self.temp_dir) + def tearDown(self): + """Clean up temporary files.""" + shutil.rmtree(self.temp_dir, ignore_errors=True) + # PBRS structural test constants PBRS_TERMINAL_TOL = 1e-12 PBRS_MAX_ABS_SHAPING = 5.0 @@ -156,6 +151,12 @@ class RewardSpaceTestBase(unittest.TestCase): action=action, ) + def base_params(self, **overrides) -> dict: + """Return fresh copy of default reward params with overrides.""" + params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() + params.update(overrides) + return params + def _canonical_sweep( self, params: dict, @@ -240,7 +241,7 @@ class RewardSpaceTestBase(unittest.TestCase): pnl, trade_duration, idle_duration, position. Guarantees: no NaN; reward_idle==0 where idle_duration==0. """ if seed is not None: - np.random.seed(seed) + self.seed_all(seed) pnl_std_eff = self.TEST_PNL_STD if pnl_std is None else pnl_std reward_total = np.random.normal(reward_total_mean, reward_total_std, n) pnl = np.random.normal(pnl_mean, pnl_std_eff, n) @@ -256,7 +257,7 @@ class RewardSpaceTestBase(unittest.TestCase): idle_duration = np.zeros(n) else: # all_nonzero idle_duration = np.random.uniform(5, 60, n) - # Component rewards (basic synthetic shaping consistent with sign expectations) + # Component rewards reward_idle = np.where(idle_duration > 0, np.random.normal(-1, 0.3, n), 0.0) reward_hold = np.random.normal(-0.5, 0.2, n) reward_exit = np.random.normal(0.8, 0.6, n) @@ -274,10 +275,6 @@ class RewardSpaceTestBase(unittest.TestCase): } ) - def tearDown(self): - """Clean up temporary files.""" - shutil.rmtree(self.temp_dir, ignore_errors=True) - def assertAlmostEqualFloat( self, first: Union[float, int], @@ -572,7 +569,7 @@ class TestStatistics(RewardSpaceTestBase): def _make_idle_variance_df(self, n: int = 100) -> pd.DataFrame: """Synthetic dataframe focusing on idle_duration ↔ reward_idle correlation.""" - np.random.seed(self.SEED) + self.seed_all(self.SEED) idle_duration = np.random.exponential(10, n) reward_idle = -0.01 * idle_duration + np.random.normal(0, 0.001, n) return pd.DataFrame( @@ -936,7 +933,7 @@ class TestStatistics(RewardSpaceTestBase): def test_stats_distribution_metrics_mathematical_bounds(self): """Mathematical bounds and validity of distribution shift metrics.""" - np.random.seed(self.SEED) + self.seed_all(self.SEED) df1 = pd.DataFrame( { "pnl": np.random.normal(0, self.TEST_PNL_STD, 500), @@ -1144,17 +1141,15 @@ class TestRewardComponents(RewardSpaceTestBase): position=Positions.Long, action=Actions.Long_exit, ) - params = self.DEFAULT_PARAMS.copy() + params = self.base_params() profit_target = self.TEST_PROFIT_TARGET * self.TEST_RR pnl_factor = _get_pnl_factor(params, ctx, profit_target, self.TEST_RR) self.assertFinite(pnl_factor, name="pnl_factor") self.assertAlmostEqualFloat(pnl_factor, 1.0, tolerance=self.TOL_GENERIC_EQ) def test_max_idle_duration_candles_logic(self): - params_small = self.DEFAULT_PARAMS.copy() - params_large = self.DEFAULT_PARAMS.copy() - params_small["max_idle_duration_candles"] = 50 - params_large["max_idle_duration_candles"] = 200 + params_small = self.base_params(max_idle_duration_candles=50) + params_large = self.base_params(max_idle_duration_candles=200) base_factor = self.TEST_BASE_FACTOR context = self.make_ctx( pnl=0.0, @@ -1189,16 +1184,12 @@ class TestRewardComponents(RewardSpaceTestBase): self.assertGreater(large.idle_penalty, small.idle_penalty) def test_exit_factor_calculation(self): - """Test exit factor calculation consistency across core modes + plateau variant. - - Plateau behavior expressed via exit_plateau=True with a base kernel (e.g. linear). - """ + """Exit factor calculation across core modes + plateau variant (plateau via exit_plateau=True).""" # Core attenuation kernels (excluding legacy which is step-based) modes_to_test = ["linear", "power"] for mode in modes_to_test: - test_params = self.DEFAULT_PARAMS.copy() - test_params["exit_attenuation_mode"] = mode + test_params = self.base_params(exit_attenuation_mode=mode) factor = _get_exit_factor( base_factor=1.0, pnl=0.02, @@ -1209,15 +1200,12 @@ class TestRewardComponents(RewardSpaceTestBase): self.assertFinite(factor, name=f"exit_factor[{mode}]") self.assertGreater(factor, 0, f"Exit factor for {mode} should be positive") - # Plateau+linear variant sanity check (grace region at 0.5) - plateau_params = self.DEFAULT_PARAMS.copy() - plateau_params.update( - { - "exit_attenuation_mode": "linear", - "exit_plateau": True, - "exit_plateau_grace": 0.5, - "exit_linear_slope": 1.0, - } + # Plateau+linear variant (grace region 0.5) + plateau_params = self.base_params( + exit_attenuation_mode="linear", + exit_plateau=True, + exit_plateau_grace=0.5, + exit_linear_slope=1.0, ) plateau_factor_pre = _get_exit_factor( base_factor=1.0, @@ -1274,16 +1262,13 @@ class TestRewardComponents(RewardSpaceTestBase): win_reward_factor = 3.0 # asymptote = 4.0 beta = 0.5 profit_target = self.TEST_PROFIT_TARGET - params = self.DEFAULT_PARAMS.copy() - params.update( - { - "win_reward_factor": win_reward_factor, - "pnl_factor_beta": beta, - "efficiency_weight": 0.0, # disable efficiency modulation - "exit_attenuation_mode": "linear", - "exit_plateau": False, - "exit_linear_slope": 0.0, # keep attenuation = 1 - } + params = self.base_params( + win_reward_factor=win_reward_factor, + pnl_factor_beta=beta, + efficiency_weight=0.0, # disable efficiency modulation + exit_attenuation_mode="linear", + exit_plateau=False, + exit_linear_slope=0.0, # keep attenuation = 1 ) # Ensure provided base_factor=1.0 is actually used (remove default 100) params.pop("base_factor", None) @@ -1352,9 +1337,8 @@ class TestRewardComponents(RewardSpaceTestBase): def test_scale_invariance_and_decomposition(self): """Components scale ~ linearly with base_factor; total equals sum(core + shaping + additives).""" - params = self.DEFAULT_PARAMS.copy() - # Remove internal base_factor so the explicit argument is used - params.pop("base_factor", None) + params = self.base_params() + params.pop("base_factor", None) # explicit base_factor argument below base_factor = 80.0 k = 7.5 profit_target = self.TEST_PROFIT_TARGET @@ -1480,7 +1464,7 @@ class TestRewardComponents(RewardSpaceTestBase): def test_long_short_symmetry(self): """Long vs Short exit reward magnitudes should match in absolute value for identical PnL (no directional bias).""" - params = self.DEFAULT_PARAMS.copy() + params = self.base_params() params.pop("base_factor", None) base_factor = 120.0 profit_target = 0.04 @@ -2018,7 +2002,7 @@ class TestPrivateFunctions(RewardSpaceTestBase): Uses a very large base_factor to trigger potential warning condition without capping. """ - params = self.DEFAULT_PARAMS.copy() + params = self.base_params() self.assertIn("check_invariants", params) self.assertIn("exit_factor_threshold", params) @@ -2112,16 +2096,12 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): active_label: str = sc["active"] # type: ignore[index] with self.subTest(active=active_label): # Build parameters disabling shaping and additives to enforce strict decomposition - params_local = self.DEFAULT_PARAMS.copy() - params_local.update( - { - "entry_additive_enabled": False, - "exit_additive_enabled": False, - "hold_potential_enabled": False, - "potential_gamma": 0.0, - # Ensure any invariance flags do not add shaping - "check_invariants": False, - } + params_local = self.base_params( + entry_additive_enabled=False, + exit_additive_enabled=False, + hold_potential_enabled=False, + potential_gamma=0.0, + check_invariants=False, ) br = calculate_reward( ctx_obj, @@ -2259,8 +2239,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): def test_idle_penalty_fallback_and_proportionality(self): """Idle penalty fallback denominator & proportional scaling (robustness).""" - params = self.DEFAULT_PARAMS.copy() - params["max_idle_duration_candles"] = None + params = self.base_params(max_idle_duration_candles=None) base_factor = 90.0 profit_target = self.TEST_PROFIT_TARGET risk_reward_ratio = 1.0 @@ -2320,7 +2299,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): def test_exit_factor_threshold_warning_and_non_capping(self): """Warning emission without capping when exit_factor_threshold exceeded.""" - params = base_params(exit_factor_threshold=10.0) + params = self.base_params(exit_factor_threshold=10.0) params.pop("base_factor", None) context = self.make_ctx( pnl=0.08, @@ -2376,10 +2355,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): pnl = 0.03 pnl_factor = 1.0 duration_ratios = [0.0, 0.2, 0.5, 1.0, 1.5] - params_bad = base_params( + params_bad = self.base_params( exit_attenuation_mode="linear", exit_linear_slope=-5.0, exit_plateau=False ) - params_ref = base_params( + params_ref = self.base_params( exit_attenuation_mode="linear", exit_linear_slope=1.0, exit_plateau=False ) for dr in duration_ratios: @@ -2405,7 +2384,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): 1.0, ] # include boundary 1.0 => alpha=0 per formula? actually -> -log(1)/log2 = 0 for tau in taus: - params = base_params( + params = self.base_params( exit_attenuation_mode="power", exit_power_tau=tau, exit_plateau=False ) f0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.0, params) @@ -2426,9 +2405,10 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): # Boundary condition tests (extremes / continuity / monotonicity) def test_extreme_parameter_values(self): - extreme_params = self.DEFAULT_PARAMS.copy() - extreme_params["win_reward_factor"] = 1000.0 - extreme_params["base_factor"] = 10000.0 + extreme_params = self.base_params( + win_reward_factor=1000.0, + base_factor=10000.0, + ) context = RewardContext( pnl=0.05, trade_duration=50, @@ -2454,8 +2434,7 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): modes = ATTENUATION_MODES_WITH_LEGACY for mode in modes: with self.subTest(mode=mode): - test_params = self.DEFAULT_PARAMS.copy() - test_params["exit_attenuation_mode"] = mode + test_params = self.base_params(exit_attenuation_mode=mode) ctx = RewardContext( pnl=0.02, trade_duration=50, @@ -2490,20 +2469,30 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): pnl = 0.05 pnl_factor = 1.0 for mode in modes: - params = self.DEFAULT_PARAMS.copy() - if mode in ("sqrt", "linear", "power", "half_life"): - params["exit_attenuation_mode"] = mode - if mode == "linear": - params["exit_linear_slope"] = 1.2 if mode == "plateau_linear": - params["exit_attenuation_mode"] = "linear" - params["exit_plateau"] = True - params["exit_plateau_grace"] = 0.2 - params["exit_linear_slope"] = 1.0 - if mode == "power": - params["exit_power_tau"] = 0.5 - if mode == "half_life": - params["exit_half_life"] = 0.7 + params = self.base_params( + exit_attenuation_mode="linear", + exit_plateau=True, + exit_plateau_grace=0.2, + exit_linear_slope=1.0, + ) + elif mode == "linear": + params = self.base_params( + exit_attenuation_mode="linear", + exit_linear_slope=1.2, + ) + elif mode == "power": + params = self.base_params( + exit_attenuation_mode="power", + exit_power_tau=0.5, + ) + elif mode == "half_life": + params = self.base_params( + exit_attenuation_mode="half_life", + exit_half_life=0.7, + ) + else: # sqrt + params = self.base_params(exit_attenuation_mode="sqrt") ratios = np.linspace(0, 2, 15) values = [ @@ -2535,14 +2524,12 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): pnl = 0.02 pnl_factor = 1.0 # Tau near 1 (minimal attenuation) vs tau near 0 (strong attenuation) - params_hi = self.DEFAULT_PARAMS.copy() - params_hi.update({"exit_attenuation_mode": "power", "exit_power_tau": 0.999999}) - params_lo = self.DEFAULT_PARAMS.copy() - params_lo.update( - { - "exit_attenuation_mode": "power", - "exit_power_tau": self.MIN_EXIT_POWER_TAU, - } + params_hi = self.base_params( + exit_attenuation_mode="power", exit_power_tau=0.999999 + ) + params_lo = self.base_params( + exit_attenuation_mode="power", + exit_power_tau=self.MIN_EXIT_POWER_TAU, ) r = 1.5 hi_val = _get_exit_factor(base_factor, pnl, pnl_factor, r, params_hi) @@ -2553,23 +2540,17 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): "Power mode: higher tau (≈1) should attenuate less than tiny tau", ) # Plateau grace 0 vs 1 - params_g0 = self.DEFAULT_PARAMS.copy() - params_g0.update( - { - "exit_attenuation_mode": "linear", - "exit_plateau": True, - "exit_plateau_grace": 0.0, - "exit_linear_slope": 1.0, - } - ) - params_g1 = self.DEFAULT_PARAMS.copy() - params_g1.update( - { - "exit_attenuation_mode": "linear", - "exit_plateau": True, - "exit_plateau_grace": 1.0, - "exit_linear_slope": 1.0, - } + params_g0 = self.base_params( + exit_attenuation_mode="linear", + exit_plateau=True, + exit_plateau_grace=0.0, + exit_linear_slope=1.0, + ) + params_g1 = self.base_params( + exit_attenuation_mode="linear", + exit_plateau=True, + exit_plateau_grace=1.0, + exit_linear_slope=1.0, ) val_g0 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g0) val_g1 = _get_exit_factor(base_factor, pnl, pnl_factor, 0.5, params_g1) @@ -2580,21 +2561,15 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): "Plateau grace=1.0 should delay attenuation vs grace=0.0", ) # Linear slope zero vs positive - params_lin0 = self.DEFAULT_PARAMS.copy() - params_lin0.update( - { - "exit_attenuation_mode": "linear", - "exit_linear_slope": 0.0, - "exit_plateau": False, - } + params_lin0 = self.base_params( + exit_attenuation_mode="linear", + exit_linear_slope=0.0, + exit_plateau=False, ) - params_lin1 = self.DEFAULT_PARAMS.copy() - params_lin1.update( - { - "exit_attenuation_mode": "linear", - "exit_linear_slope": 2.0, - "exit_plateau": False, - } + params_lin1 = self.base_params( + exit_attenuation_mode="linear", + exit_linear_slope=2.0, + exit_plateau=False, ) val_lin0 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin0) val_lin1 = _get_exit_factor(base_factor, pnl, pnl_factor, 1.0, params_lin1) @@ -2607,14 +2582,11 @@ class TestRewardRobustnessAndBoundaries(RewardSpaceTestBase): def test_plateau_linear_slope_zero_constant_after_grace(self): """Plateau+linear slope=0 should yield flat factor after grace boundary (no attenuation).""" - params = self.DEFAULT_PARAMS.copy() - params.update( - { - "exit_attenuation_mode": "linear", - "exit_plateau": True, - "exit_plateau_grace": 0.3, - "exit_linear_slope": 0.0, - } + params = self.base_params( + exit_attenuation_mode="linear", + exit_plateau=True, + exit_plateau_grace=0.3, + exit_linear_slope=0.0, ) base_factor = self.TEST_BASE_FACTOR pnl = 0.04 @@ -3077,14 +3049,11 @@ class TestPBRS(RewardSpaceTestBase): self.assertEqual(val, 0.0) def test_exit_potential_canonical(self): - params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() - params.update( - { - "exit_potential_mode": "canonical", - "hold_potential_enabled": True, - "entry_additive_enabled": True, # expected to be auto-disabled by canonical invariance - "exit_additive_enabled": True, # expected to be auto-disabled by canonical invariance - } + params = self.base_params( + exit_potential_mode="canonical", + hold_potential_enabled=True, + entry_additive_enabled=True, # expected to be auto-disabled by canonical invariance + exit_additive_enabled=True, # expected to be auto-disabled by canonical invariance ) base_reward = 0.25 current_pnl = 0.05 @@ -3130,14 +3099,11 @@ class TestPBRS(RewardSpaceTestBase): def test_pbrs_invariance_internal_flag_set(self): """Canonical path sets _pbrs_invariance_applied once; second call idempotent.""" - params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() - params.update( - { - "exit_potential_mode": "canonical", - "hold_potential_enabled": True, - "entry_additive_enabled": True, # will be auto-disabled - "exit_additive_enabled": True, - } + params = self.base_params( + exit_potential_mode="canonical", + hold_potential_enabled=True, + entry_additive_enabled=True, # will be auto-disabled + exit_additive_enabled=True, ) # Structural sweep (ensures terminal Φ'==0 and shaping bounded) terminal_next_potentials, shaping_values = self._canonical_sweep(params) @@ -3185,13 +3151,10 @@ class TestPBRS(RewardSpaceTestBase): def test_progressive_release_negative_decay_clamped(self): """Negative decay must clamp to 0 => next potential equals last potential (no release).""" - params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() - params.update( - { - "exit_potential_mode": "progressive_release", - "exit_potential_decay": -0.75, # clamped to 0 - "hold_potential_enabled": True, - } + params = self.base_params( + exit_potential_mode="progressive_release", + exit_potential_decay=-0.75, # clamped to 0 + hold_potential_enabled=True, ) last_potential = 0.42 # Use neutral current state so Φ(s) ≈ 0 (approx) if transforms remain small. @@ -3219,11 +3182,10 @@ class TestPBRS(RewardSpaceTestBase): def test_potential_gamma_nan_fallback(self): """potential_gamma=NaN should fall back to default value (indirect comparison).""" - base_params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() - default_gamma = base_params.get("potential_gamma", 0.95) - params_nan = base_params.copy() - params_nan.update( - {"potential_gamma": float("nan"), "hold_potential_enabled": True} + base_params_dict = self.base_params() + default_gamma = base_params_dict.get("potential_gamma", 0.95) + params_nan = self.base_params( + potential_gamma=float("nan"), hold_potential_enabled=True ) # Non-terminal transition so Φ(s') is computed and depends on gamma res_nan = apply_potential_shaping( @@ -3236,9 +3198,8 @@ class TestPBRS(RewardSpaceTestBase): last_potential=0.0, params=params_nan, ) - params_ref = base_params.copy() - params_ref.update( - {"potential_gamma": default_gamma, "hold_potential_enabled": True} + params_ref = self.base_params( + potential_gamma=default_gamma, hold_potential_enabled=True ) res_ref = apply_potential_shaping( base_reward=0.1, @@ -3283,15 +3244,12 @@ class TestPBRS(RewardSpaceTestBase): def test_pbrs_retain_previous_cumulative_drift(self): """retain_previous mode accumulates negative shaping drift (non-invariant).""" - params = DEFAULT_MODEL_REWARD_PARAMETERS.copy() - params.update( - { - "exit_potential_mode": "retain_previous", - "hold_potential_enabled": True, - "entry_additive_enabled": False, - "exit_additive_enabled": False, - "potential_gamma": 0.9, - } + params = self.base_params( + exit_potential_mode="retain_previous", + hold_potential_enabled=True, + entry_additive_enabled=False, + exit_additive_enabled=False, + potential_gamma=0.9, ) gamma = _get_float_param( params, @@ -3433,7 +3391,7 @@ class TestReportFormatting(RewardSpaceTestBase): """Additives enabled increase total reward; shaping impact limited.""" # Use a non-canonical exit mode to avoid automatic invariance enforcement # disabling the additive components on first call (canonical path auto-disables). - base = base_params( + base = self.base_params( hold_potential_enabled=True, entry_additive_enabled=False, exit_additive_enabled=False, @@ -3472,7 +3430,7 @@ class TestReportFormatting(RewardSpaceTestBase): def test_report_cumulative_invariance_aggregation(self): """Canonical telescoping term: small per-step mean drift, bounded increments.""" - params = base_params( + params = self.base_params( hold_potential_enabled=True, entry_additive_enabled=False, exit_additive_enabled=False, @@ -3529,7 +3487,7 @@ class TestReportFormatting(RewardSpaceTestBase): def test_report_explicit_non_invariance_progressive_release(self): """progressive_release should generally yield non-zero cumulative shaping (release leak).""" - params = base_params( + params = self.base_params( hold_potential_enabled=True, entry_additive_enabled=False, exit_additive_enabled=False, @@ -3564,7 +3522,7 @@ class TestReportFormatting(RewardSpaceTestBase): def test_gamma_extremes(self): """Gamma=0 and gamma≈1 boundary behaviours produce bounded shaping and finite potentials.""" for gamma in [0.0, 0.999999]: - params = base_params( + params = self.base_params( hold_potential_enabled=True, entry_additive_enabled=False, exit_additive_enabled=False, -- 2.43.0