]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
test(reforcexy): improve coverage
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 15 Oct 2025 22:54:38 +0000 (00:54 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 15 Oct 2025 22:54:38 +0000 (00:54 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/reward_space_analysis/test_reward_space_analysis.py

index c1f99b8259f31e5fb44c808c8b15581b723fa9df..173f93ab0852de6dfc81e3bd8ca8209513a80e8e 100644 (file)
@@ -52,6 +52,7 @@ try:
         _get_bool_param,
         _get_exit_factor,
         _get_float_param,
+        _get_int_param,
         _get_pnl_factor,
         _get_str_param,
         apply_potential_shaping,
@@ -448,6 +449,31 @@ class RewardSpaceTestBase(unittest.TestCase):
         np.random.seed(seed)
         random.seed(seed)
 
+    # Shared helper data generators (moved here for subclass availability)
+    def _const_df(self, n: int = 64) -> pd.DataFrame:
+        return pd.DataFrame(
+            {
+                "reward_total": np.ones(n) * 0.5,
+                "pnl": np.zeros(n),
+                "trade_duration": np.ones(n) * 10,
+                "idle_duration": np.ones(n) * 3,
+            }
+        )
+
+    def _shift_scale_df(
+        self, n: int = 256, shift: float = 0.0, scale: float = 1.0
+    ) -> pd.DataFrame:
+        rng = np.random.default_rng(123)
+        base = rng.normal(0, 1, n)
+        return pd.DataFrame(
+            {
+                "reward_total": shift + scale * base,
+                "pnl": shift + scale * base * 0.2,
+                "trade_duration": rng.exponential(20, n),
+                "idle_duration": rng.exponential(10, n),
+            }
+        )
+
 
 class TestIntegration(RewardSpaceTestBase):
     """CLI + file output integration tests."""
@@ -705,31 +731,6 @@ class TestStatistics(RewardSpaceTestBase):
         results = statistical_hypothesis_tests(base)
         self.assertIsInstance(results, dict)
 
-    # Helper data generators
-    def _const_df(self, n: int = 64) -> pd.DataFrame:
-        return pd.DataFrame(
-            {
-                "reward_total": np.ones(n) * 0.5,
-                "pnl": np.zeros(n),
-                "trade_duration": np.ones(n) * 10,
-                "idle_duration": np.ones(n) * 3,
-            }
-        )
-
-    def _shift_scale_df(
-        self, n: int = 256, shift: float = 0.0, scale: float = 1.0
-    ) -> pd.DataFrame:
-        rng = np.random.default_rng(123)
-        base = rng.normal(0, 1, n)
-        return pd.DataFrame(
-            {
-                "reward_total": shift + scale * base,
-                "pnl": shift + scale * base * 0.2,
-                "trade_duration": rng.exponential(20, n),
-                "idle_duration": rng.exponential(10, n),
-            }
-        )
-
     def test_stats_constant_distribution_bootstrap_and_diagnostics(self):
         """Bootstrap on constant columns (degenerate)."""
         df = self._const_df(80)
@@ -762,7 +763,7 @@ class TestStatistics(RewardSpaceTestBase):
         )
 
     def test_stats_js_distance_symmetry_helper(self):
-        """Symmetry helper assertion for JS distance."""
+        """JS distance properties: symmetry d(P,Q)=d(Q,P) and upper bound sqrt(log 2)."""
         rng = np.random.default_rng(777)
         p_raw = rng.uniform(0.0, 1.0, size=400)
         q_raw = rng.uniform(0.0, 1.0, size=400)
@@ -778,9 +779,11 @@ class TestStatistics(RewardSpaceTestBase):
             js_div = 0.5 * _kl(a, m) + 0.5 * _kl(b, m)
             return math.sqrt(max(js_div, 0.0))
 
+        # Symmetry
         self.assertSymmetric(
             js_distance, p, q, atol=self.TOL_IDENTITY_STRICT, rtol=self.TOL_RELATIVE
         )
+        # Upper bound
         self.assertLessEqual(
             js_distance(p, q), self.JS_DISTANCE_UPPER_BOUND + self.TOL_IDENTITY_STRICT
         )
@@ -3232,6 +3235,115 @@ class TestPBRS(RewardSpaceTestBase):
             "Unexpected total difference under gamma NaN fallback",
         )
 
+    def test_validate_reward_parameters_success_and_failure(self):
+        """validate_reward_parameters: success on defaults and failure on invalid ranges."""
+        params_ok = DEFAULT_MODEL_REWARD_PARAMETERS.copy()
+        try:
+            validated = validate_reward_parameters(params_ok)
+        except Exception as e:  # pragma: no cover
+            self.fail(f"validate_reward_parameters raised unexpectedly: {e}")
+        # validate_reward_parameters may return (params, diagnostics) or just params
+        if (
+            isinstance(validated, tuple)
+            and len(validated) >= 1
+            and isinstance(validated[0], dict)
+        ):
+            validated_params = validated[0]
+        else:
+            validated_params = validated  # type: ignore[assignment]
+        for k in ("potential_gamma", "hold_potential_enabled", "exit_potential_mode"):
+            self.assertIn(k, validated_params, f"Missing key '{k}' in validated params")
+
+        # Introduce invalid values
+        params_bad = params_ok.copy()
+        params_bad["potential_gamma"] = -0.2  # invalid
+        params_bad["hold_potential_scale"] = -5.0  # invalid
+        with self.assertRaises((ValueError, AssertionError)):
+            vr = validate_reward_parameters(params_bad)
+            # If function returns tuple without raising, enforce failure explicitly
+            if not isinstance(vr, Exception):
+                self.fail("validate_reward_parameters should raise on invalid params")
+
+    def test_compute_exit_potential_mode_differences(self):
+        """_compute_exit_potential modes: canonical resets Φ; spike_cancel approx preserves γΦ' ≈ Φ_prev (delta≈0)."""
+        gamma = 0.93
+        base_common = dict(
+            hold_potential_enabled=True,
+            potential_gamma=gamma,
+            entry_additive_enabled=False,
+            exit_additive_enabled=False,
+            hold_potential_scale=1.0,
+        )
+        # Build previous potential via hold computation
+        ctx_pnl = 0.012
+        ctx_dur_ratio = 0.3
+        params_can = self.base_params(exit_potential_mode="canonical", **base_common)
+        prev_phi = _compute_hold_potential(ctx_pnl, ctx_dur_ratio, params_can)
+        self.assertFinite(prev_phi, name="prev_phi")
+        # Canonical exit potential -> next_phi=0
+        next_phi_can = _compute_exit_potential(prev_phi, params_can)
+        self.assertAlmostEqualFloat(
+            next_phi_can,
+            0.0,
+            tolerance=self.TOL_IDENTITY_STRICT,
+            msg="Canonical exit must zero potential",
+        )
+        canonical_delta = -prev_phi  # γ*0 - prev_phi
+        self.assertAlmostEqualFloat(
+            canonical_delta,
+            -prev_phi,
+            tolerance=self.TOL_IDENTITY_RELAXED,
+            msg="Canonical delta mismatch",
+        )
+        # Spike cancel mode
+        params_spike = self.base_params(
+            exit_potential_mode="spike_cancel", **base_common
+        )
+        next_phi_spike = _compute_exit_potential(prev_phi, params_spike)
+        shaping_spike = gamma * next_phi_spike - prev_phi
+        self.assertNearZero(
+            shaping_spike,
+            atol=self.TOL_IDENTITY_RELAXED,
+            msg="Spike cancel should nullify shaping delta",
+        )
+        # Magnitude comparison
+        self.assertGreaterEqual(
+            abs(canonical_delta) + self.TOL_IDENTITY_STRICT,
+            abs(shaping_spike),
+            "Canonical shaping magnitude should exceed spike_cancel",
+        )
+
+    def test_get_int_param_coercions(self):
+        """Robust coercion paths of _get_int_param (bool/int/float/str/None/unsupported)."""
+        # None with numeric default
+        self.assertEqual(_get_int_param({"k": None}, "k", 5), 5)
+        # None with non-numeric default -> 0 fallback
+        self.assertEqual(_get_int_param({"k": None}, "k", "x"), 0)
+        # Booleans
+        self.assertEqual(_get_int_param({"k": True}, "k", 0), 1)
+        self.assertEqual(_get_int_param({"k": False}, "k", 7), 0)
+        # Int passthrough
+        self.assertEqual(_get_int_param({"k": -12}, "k", 0), -12)
+        # Float truncation & negative
+        self.assertEqual(_get_int_param({"k": 9.99}, "k", 0), 9)
+        self.assertEqual(_get_int_param({"k": -3.7}, "k", 0), -3)
+        # Non-finite floats fallback
+        self.assertEqual(_get_int_param({"k": float("nan")}, "k", 4), 4)
+        self.assertEqual(_get_int_param({"k": float("inf")}, "k", 4), 4)
+        # String numerics (int, float, exponent)
+        self.assertEqual(_get_int_param({"k": "42"}, "k", 0), 42)
+        self.assertEqual(_get_int_param({"k": " 17 "}, "k", 0), 17)
+        self.assertEqual(_get_int_param({"k": "3.9"}, "k", 0), 3)
+        self.assertEqual(_get_int_param({"k": "1e2"}, "k", 0), 100)
+        # String fallbacks (empty, invalid, NaN token)
+        self.assertEqual(_get_int_param({"k": ""}, "k", 5), 5)
+        self.assertEqual(_get_int_param({"k": "abc"}, "k", 5), 5)
+        self.assertEqual(_get_int_param({"k": "NaN"}, "k", 5), 5)
+        # Unsupported type
+        self.assertEqual(_get_int_param({"k": [1, 2, 3]}, "k", 3), 3)
+        # Missing key with non-numeric default
+        self.assertEqual(_get_int_param({}, "missing", "zzz"), 0)
+
     def test_transform_bulk_monotonicity_and_bounds(self):
         """Non-decreasing monotonicity & (-1,1) bounds for smooth transforms (excluding clip)."""
         transforms = ["tanh", "softsign", "arctan", "sigmoid", "asinh"]
@@ -3293,33 +3405,6 @@ class TestPBRS(RewardSpaceTestBase):
         self.assertAlmostEqualFloat(s_base, s_scaled, tolerance=self.TOL_DISTRIB_SHAPE)
         self.assertAlmostEqualFloat(k_base, k_scaled, tolerance=self.TOL_DISTRIB_SHAPE)
 
-    def test_js_symmetry_and_kl_relation_bound(self):
-        """JS distance symmetry & upper bound sqrt(log 2)."""
-        rng = np.random.default_rng(9090)
-        p_raw = rng.uniform(0.0, 1.0, size=300)
-        q_raw = rng.uniform(0.0, 1.0, size=300)
-        p = p_raw / p_raw.sum()
-        q = q_raw / q_raw.sum()
-        m = 0.5 * (p + q)
-
-        def _kl(a, b):
-            mask = (a > 0) & (b > 0)
-            return float(np.sum(a[mask] * np.log(a[mask] / b[mask])))
-
-        js_div = 0.5 * _kl(p, m) + 0.5 * _kl(q, m)
-        js_dist = math.sqrt(max(js_div, 0.0))
-        self.assertDistanceMetric(js_dist, name="js_distance")
-        # Upper bound plus strict identity epsilon guard
-        self.assertLessEqual(
-            js_dist,
-            self.JS_DISTANCE_UPPER_BOUND + self.TOL_IDENTITY_STRICT,
-        )
-        js_div_swap = 0.5 * _kl(q, m) + 0.5 * _kl(p, m)
-        js_dist_swap = math.sqrt(max(js_div_swap, 0.0))
-        self.assertAlmostEqualFloat(
-            js_dist, js_dist_swap, tolerance=self.TOL_GENERIC_EQ
-        )
-
 
 class TestReportFormatting(RewardSpaceTestBase):
     """Tests for report formatting elements not previously covered."""
@@ -3553,6 +3638,53 @@ class TestReportFormatting(RewardSpaceTestBase):
             self.assertLessEqual(abs(shap), self.PBRS_MAX_ABS_SHAPING)
 
 
+class TestBootstrapStatistics(RewardSpaceTestBase):
+    """Grouped tests for bootstrap confidence interval behavior."""
+
+    def test_constant_distribution_bootstrap_and_diagnostics(self):
+        """Degenerate columns produce (mean≈lo≈hi) zero-width intervals."""
+        df = self._const_df(80)
+        res = bootstrap_confidence_intervals(
+            df, ["reward_total", "pnl"], n_bootstrap=200, confidence_level=0.95
+        )
+        for k, (mean, lo, hi) in res.items():
+            self.assertAlmostEqualFloat(mean, lo, tolerance=2e-9)
+            self.assertAlmostEqualFloat(mean, hi, tolerance=2e-9)
+            self.assertLessEqual(hi - lo, 2e-9)
+
+    def test_bootstrap_shrinkage_with_sample_size(self):
+        """Half-width decreases with larger sample (~1/sqrt(n) heuristic)."""
+        small = self._shift_scale_df(80)
+        large = self._shift_scale_df(800)
+        res_small = bootstrap_confidence_intervals(
+            small, ["reward_total"], n_bootstrap=400
+        )
+        res_large = bootstrap_confidence_intervals(
+            large, ["reward_total"], n_bootstrap=400
+        )
+        (_, lo_s, hi_s) = list(res_small.values())[0]
+        (_, lo_l, hi_l) = list(res_large.values())[0]
+        hw_small = (hi_s - lo_s) / 2.0
+        hw_large = (hi_l - lo_l) / 2.0
+        self.assertFinite(hw_small, name="hw_small")
+        self.assertFinite(hw_large, name="hw_large")
+        self.assertLess(hw_large, hw_small * 0.55)
+
+    def test_bootstrap_confidence_intervals_basic(self):
+        """Basic CI computation returns ordered finite bounds."""
+        test_data = self.make_stats_df(n=100, seed=self.SEED)
+        results = bootstrap_confidence_intervals(
+            test_data,
+            ["reward_total", "pnl"],
+            n_bootstrap=100,
+        )
+        for metric, (mean, ci_low, ci_high) in results.items():
+            self.assertFinite(mean, name=f"mean[{metric}]")
+            self.assertFinite(ci_low, name=f"ci_low[{metric}]")
+            self.assertFinite(ci_high, name=f"ci_high[{metric}]")
+            self.assertLess(ci_low, ci_high)
+
+
 if __name__ == "__main__":
     # Configure test discovery and execution
     loader = unittest.TestLoader()