]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
feat(reward): allow exit_piecewise_grace >1 and extend no-attenuation region; docs...
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 Oct 2025 12:55:28 +0000 (14:55 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 Oct 2025 12:55:28 +0000 (14:55 +0200)
ReforceXY/reward_space_analysis/README.md
ReforceXY/reward_space_analysis/reward_space_analysis.py
ReforceXY/reward_space_analysis/test_reward_space_analysis.py

index cab7e5cec960c8ee59064a03a777c7ab41c460ff..d77e8f6ec2e64356cf1eedc2203cc4431125c278 100644 (file)
@@ -230,7 +230,7 @@ _Exit factor configuration:_
 
 - `exit_factor_mode` (default: piecewise) - Time attenuation mode for exit factor (legacy|sqrt|linear|power|piecewise|half_life)
 - `exit_linear_slope` (default: 1.0) - Slope for linear exit attenuation
-- `exit_piecewise_grace` (default: 1.0) - Grace region fraction [0,1]; divisor=1 within grace
+- `exit_piecewise_grace` (default: 1.0) - Grace region boundary (duration ratio); values >1.0 extend no-attenuation period
 - `exit_piecewise_slope` (default: 1.0) - Slope after grace for piecewise mode (0 ⇒ flat beyond grace)
 - `exit_power_tau` (default: 0.5) - Tau in (0,1] mapped to alpha = -ln(tau)/ln(2)
 - `exit_half_life` (default: 0.5) - Half-life for exponential decay exit mode (factor *= 2^(-r/half_life))
@@ -664,7 +664,7 @@ Design intent: maintain a single canonical defaults map + explicit bounds; no si
 | `holding_penalty_scale` | 0.0 | — | Scale ≥ 0 |
 | `holding_penalty_power` | 0.0 | — | Power exponent ≥ 0 |
 | `exit_linear_slope` | 0.0 | — | Slope ≥ 0 |
-| `exit_piecewise_grace` | 0.0 | 1.0 | Fraction of max duration (grace region) |
+| `exit_piecewise_grace` | 0.0 | — | Grace boundary expressed in duration ratio units (can exceed 1.0 to extend full-strength region) |
 | `exit_piecewise_slope` | 0.0 | — | Slope ≥ 0 |
 | `exit_power_tau` | 1e-6 | 1.0 | Mapped to alpha = -ln(tau) |
 | `exit_half_life` | 1e-6 | — | Half-life in duration ratio units |
index 9c698fb56394eea363e18585c5faabdd98de867e..6456db6b1d605a014185111bf56ded339a8f0e40 100644 (file)
@@ -96,8 +96,9 @@ def _piecewise_duration_divisor(
     exception fallback in ``_get_exit_factor`` without duplicating logic.
     """
     exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0)
-    if not (0.0 <= exit_piecewise_grace <= 1.0):  # sanitize grace range
-        exit_piecewise_grace = 1.0
+    # Only enforce a lower bound; values >1.0 extend the grace region beyond max duration ratio.
+    if exit_piecewise_grace < 0.0:
+        exit_piecewise_grace = 0.0
     exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0)
     if exit_piecewise_slope < 0.0:  # sanitize slope sign
         exit_piecewise_slope = 1.0
@@ -387,7 +388,7 @@ def _get_exit_factor(
             "power",
             "half_life",
         }:
-            # Default & fallback behaviour consolidated
+            # Default behaviour
             factor /= _piecewise_duration_divisor(duration_ratio, params)
         elif exit_factor_mode == "half_life":
             exit_half_life = _get_param_float(params, "exit_half_life", 0.5)
@@ -487,16 +488,11 @@ def _idle_penalty(
     """Mirror the environment's idle penalty behaviour."""
     idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 1.0)
     idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.0)
-    max_idle_duration_cfg = int(
+    max_idle_duration = int(
         params.get(
-            "max_idle_duration_candles", params.get("max_trade_duration_candles", 0)
+            "max_idle_duration_candles", params.get("max_trade_duration_candles", 128)
         )
     )
-    # Fallback: align with documented intent -> use context.max_trade_duration when cfg <= 0
-    if max_idle_duration_cfg <= 0:
-        max_idle_duration = context.max_trade_duration
-    else:
-        max_idle_duration = max_idle_duration_cfg
     idle_duration_ratio = context.idle_duration / max(1, max_idle_duration)
     return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power
 
index 5e61c742a9664c2960cf302698015e5e48c0fb5a..2d39e9ee0382302b8e2d67bd2658dcc9d4a0c6cb 100644 (file)
@@ -2119,6 +2119,7 @@ class TestRewardRobustness(RewardSpaceTestBase):
     def test_piecewise_slope_zero_constant_after_grace(self):
         """Piecewise slope=0 should yield flat factor after grace boundary."""
         from reward_space_analysis import compute_exit_factor
+
         params = self.DEFAULT_PARAMS.copy()
         params.update(
             {
@@ -2144,9 +2145,37 @@ class TestRewardRobustness(RewardSpaceTestBase):
                 msg=f"Piecewise slope=0 factor drift at ratio set {ratios} => {values}",
             )
 
+    def test_piecewise_grace_extends_beyond_one(self):
+        """Grace >1.0 should keep divisor=1 (no attenuation) past duration_ratio=1."""
+        from reward_space_analysis import compute_exit_factor
+
+        params = self.DEFAULT_PARAMS.copy()
+        params.update(
+            {
+                "exit_factor_mode": "piecewise",
+                "exit_piecewise_grace": 1.5,  # extend grace beyond max duration ratio 1.0
+                "exit_piecewise_slope": 2.0,
+            }
+        )
+        base_factor = 80.0
+        pnl = 0.03
+        pnl_factor = 1.1
+        # Ratios straddling 1.0 but below grace=1.5 plus one beyond grace
+        ratios = [0.8, 1.0, 1.2, 1.4, 1.6]
+        vals = [compute_exit_factor(base_factor, pnl, pnl_factor, r, params) for r in ratios]
+        # All ratios <=1.5 should yield identical factor
+        ref = vals[0]
+        for i, r in enumerate(ratios[:-1]):  # exclude last (1.6)
+            self.assertAlmostEqualFloat(
+                vals[i], ref, 1e-9, msg=f"Unexpected attenuation before grace end at ratio {r}"
+            )
+        # Last ratio (1.6) should be attenuated (strictly less than ref)
+        self.assertLess(vals[-1], ref, "Attenuation should begin after grace boundary")
+
     def test_legacy_step_non_monotonic(self):
         """Legacy mode applies step change at duration_ratio=1 (should not be monotonic)."""
         from reward_space_analysis import compute_exit_factor
+
         params = self.DEFAULT_PARAMS.copy()
         params["exit_factor_mode"] = "legacy"
         base_factor = 100.0
@@ -2169,6 +2198,7 @@ class TestRewardRobustness(RewardSpaceTestBase):
     def test_exit_factor_non_negative_with_positive_pnl(self):
         """Exit factor must not be negative when pnl >= 0 (invariant clamp)."""
         from reward_space_analysis import compute_exit_factor
+
         params = self.DEFAULT_PARAMS.copy()
         # Try multiple modes / extreme params
         modes = ["linear", "power", "piecewise", "half_life", "sqrt", "legacy"]
@@ -2180,7 +2210,9 @@ class TestRewardRobustness(RewardSpaceTestBase):
             params_mode["exit_factor_mode"] = mode
             val = compute_exit_factor(base_factor, pnl, pnl_factor, 2.0, params_mode)
             self.assertGreaterEqual(
-                val, 0.0, f"Exit factor should be >=0 for non-negative pnl in mode {mode}"
+                val,
+                0.0,
+                f"Exit factor should be >=0 for non-negative pnl in mode {mode}",
             )
 
 
@@ -2309,7 +2341,18 @@ class TestParameterValidation(RewardSpaceTestBase):
         params["exit_factor_threshold"] = 10.0  # low threshold to trigger easily
         # Remove base_factor to allow argument override
         params.pop("base_factor", None)
-        context = self._mk_context(pnl=0.06, trade_duration=10)
+        from reward_space_analysis import RewardContext, Actions, Positions
+        context = RewardContext(
+            pnl=0.06,
+            trade_duration=10,
+            idle_duration=0,
+            max_trade_duration=128,
+            max_unrealized_profit=0.08,
+            min_unrealized_profit=0.0,
+            position=Positions.Long,
+            action=Actions.Long_exit,
+            force_action=None,
+        )
         with _warnings.catch_warnings(record=True) as w:
             _warnings.simplefilter("always")
             br = calculate_reward(