]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
perf(reforcexy): move pnl aware rewarding logic to trade exit actions
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 5 Oct 2025 18:35:16 +0000 (20:35 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 5 Oct 2025 18:35:16 +0000 (20:35 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
ReforceXY/reward_space_analysis/README.md
ReforceXY/reward_space_analysis/reward_space_analysis.py
ReforceXY/reward_space_analysis/test_reward_space_analysis.py
ReforceXY/user_data/freqaimodels/ReforceXY.py

index 6a75f1f16f30566dfca7c6a4d2c37fecc78638e8..02d4e2825b0a0e916d6c517745680721513e6030 100644 (file)
@@ -13,9 +13,9 @@ This tool helps you understand and validate how the ReforceXY reinforcement lear
 - ✅ Generate thousands of synthetic trading scenarios deterministically
 - ✅ Analyze reward distribution, feature importance & partial dependence
 - ✅ Built‑in invariant & statistical validation layers (fail‑fast)
-- ✅ Export reproducible artifacts (hash + manifest)
+- ✅ Export reproducible artifacts (parameter hash + execution manifest)
 - ✅ Compare synthetic vs real trading data (distribution shift metrics)
-- ✅ Parameter bounds & automatic sanitization
+- ✅ Parameter bounds validation & automatic sanitization
 
 ---
 
@@ -47,7 +47,7 @@ Whenever you need to run analyses or tests, activate the environment first:
 
 ```shell
 source .venv/bin/activate
-python reward_space_analysis.py --num_samples 20000 --output demo
+python reward_space_analysis.py --num_samples 20000 --output reward_space_outputs
 python test_reward_space_analysis.py
 ```
 
@@ -64,14 +64,15 @@ Unless otherwise noted, the command examples below assume your current working d
 **Goal:** Ensure rewards behave as expected in different scenarios
 
 ```shell
-python reward_space_analysis.py --num_samples 20000 --output validation
+python reward_space_analysis.py --num_samples 20000 --output reward_space_outputs
 ```
 
 **Check in `statistical_analysis.md`:**
 
 - Long/Short exits should have positive average rewards
-- Invalid actions should have negative penalties
-- Idle periods should reduce rewards
+- Invalid actions should have negative penalties (default: -2.0)
+- Idle periods should reduce rewards progressively
+- Validation layers report any invariant violations
 
 ### 2. Analyze Parameter Sensitivity
 
@@ -82,12 +83,12 @@ python reward_space_analysis.py --num_samples 20000 --output validation
 python reward_space_analysis.py \
     --num_samples 30000 \
     --params win_reward_factor=2.0 \
-    --output conservative
+    --output conservative_rewards
 
 python reward_space_analysis.py \
     --num_samples 30000 \
     --params win_reward_factor=4.0 \
-    --output aggressive
+    --output aggressive_rewards
 ```
 
 **Compare:** Reward distributions between runs in `statistical_analysis.md`
@@ -120,7 +121,7 @@ python reward_space_analysis.py \
 python reward_space_analysis.py \
     --num_samples 100000 \
     --real_episodes ../user_data/transitions/*.pkl \
-    --output comparison
+    --output real_vs_synthetic
 ```
 
 ---
@@ -221,8 +222,7 @@ _Idle penalty configuration:_
 
 _Holding penalty configuration:_
 
-- `holding_duration_ratio_grace` (default: 1.0) - Grace region before holding penalty increases with duration ratio
-- `holding_penalty_scale` (default: 0.3) - Scale of holding penalty
+- `holding_penalty_scale` (default: 0.5) - Scale of holding penalty
 - `holding_penalty_power` (default: 1.0) - Power applied to holding penalty scaling
 
 _Exit factor configuration:_
@@ -284,10 +284,10 @@ python reward_space_analysis.py --num_samples 50000 --seed 777
 
 #### Direct Tunable Overrides vs `--params`
 
-Every key in `DEFAULT_MODEL_REWARD_PARAMETERS` is also exposed as an individual CLI flag (dynamic generation). You may choose either style:
+All reward parameters are also available as individual CLI flags. You may choose either style:
 
 ```shell
-# Direct flag style (dynamic CLI args)
+# Direct flag style
 python reward_space_analysis.py --win_reward_factor 3.0 --idle_penalty_scale 2.0 --num_samples 15000
 
 # Equivalent using --params
@@ -387,7 +387,7 @@ python reward_space_analysis.py \
 # Test aggressive holding penalties
 python reward_space_analysis.py \
     --num_samples 25000 \
-    --params holding_penalty_scale=0.5 holding_duration_ratio_grace=0.8 \
+    --params holding_penalty_scale=0.5 \
     --output aggressive_holding
 ```
 
@@ -430,7 +430,7 @@ done
 python test_reward_space_analysis.py
 ```
 
-The suite currently contains 32 focused tests (coverage ~84%). Example (abridged) successful run shows all test_* cases passing (see file for full list). Number may increase as validations expand.
+The suite currently contains 34 focused tests (coverage ~84%). Example (abridged) successful run shows all test_* cases passing (see file for full list). Number may increase as validations expand.
 
 ### Test Categories
 
@@ -439,17 +439,18 @@ The suite currently contains 32 focused tests (coverage ~84%). Example (abridged
 | Integration | TestIntegration | CLI, artifacts, manifest reproducibility |
 | Statistical Coherence | TestStatisticalCoherence | Distribution shift, diagnostics, hypothesis basics |
 | Reward Alignment | TestRewardAlignment | Component correctness & exit factors |
-| Public API | TestPublicFunctions | Parsing, bootstrap, reproducibility seeds |
-| Edge Cases | TestEdgeCases | Extreme params & exit modes diversity |
-| Utility | TestUtilityFunctions | I/O writers, model analysis, helper conversions |
-| Private via Public | TestPrivateFunctionsViaPublicAPI | Penalties & exit reward paths |
+| Public API | TestPublicAPI | Core API functions and interfaces |
+| Statistical Validation | TestStatisticalValidation | Mathematical bounds and validation |
+| Boundary Conditions | TestBoundaryConditions | Extreme params & edge cases |
+| Helper Functions | TestHelperFunctions | I/O writers, model analysis, utility conversions |
+| Private Functions | TestPrivateFunctions | Penalty logic & internal reward calculations |
 
 ### Test Architecture
 
 - **Single test file**: `test_reward_space_analysis.py` (consolidates all testing)
 - **Base class**: `RewardSpaceTestBase` with shared configuration and utilities
 - **Unified framework**: `unittest` with optional `pytest` configuration
-- **Reproducible**: Fixed seed (`SEED = 42`) for consistent results
+- **Reproducible**: Fixed seed (`seed = 42`) for consistent results
 
 ### Code Coverage Analysis
 
@@ -575,7 +576,7 @@ source .venv/bin/activate
 pip install pandas numpy scipy scikit-learn
 
 # Basic analysis
-python reward_space_analysis.py --num_samples 20000 --output my_analysis
+python reward_space_analysis.py --num_samples 20000 --output reward_space_outputs
 
 # Run validation tests
 python test_reward_space_analysis.py
@@ -657,7 +658,6 @@ Design intent: maintain a single canonical defaults map + explicit bounds; no si
 | `base_factor` | 0.0 | — | Global scaling factor |
 | `idle_penalty_power` | 0.0 | — | Power exponent ≥ 0 |
 | `idle_penalty_scale` | 0.0 | — | Scale ≥ 0 |
-| `holding_duration_ratio_grace` | 0.0 | - | Fraction of max duration |
 | `holding_penalty_scale` | 0.0 | — | Scale ≥ 0 |
 | `holding_penalty_power` | 0.0 | — | Power exponent ≥ 0 |
 | `exit_linear_slope` | 0.0 | — | Slope ≥ 0 |
index 9e5849dad74ad58b9c50c3f873e47123e5d3ca8e..fa256e4252db7551ca606aa0db2beaedb79afa99 100644 (file)
@@ -73,6 +73,24 @@ def _to_bool(value: Any) -> bool:
     return bool(text)
 
 
+def _get_param_float(params: Dict[str, float | str], key: str, default: float) -> float:
+    """Extract float parameter with type safety and default fallback."""
+    value = params.get(key, default)
+    if isinstance(value, (int, float)):
+        return float(value)
+    if isinstance(value, str):
+        try:
+            return float(value)
+        except (ValueError, TypeError):
+            return default
+    return default
+
+
+def _compute_duration_ratio(trade_duration: int, max_trade_duration: int) -> float:
+    """Compute duration ratio with safe division."""
+    return trade_duration / max(1, max_trade_duration)
+
+
 def _is_short_allowed(trading_mode: str) -> bool:
     mode = trading_mode.lower()
     if mode in {"margin", "futures"}:
@@ -82,6 +100,9 @@ def _is_short_allowed(trading_mode: str) -> bool:
     raise ValueError("Unsupported trading mode. Expected one of: spot, margin, futures")
 
 
+# Mathematical constants pre-computed for performance
+_LOG_2 = math.log(2.0)
+
 DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = {
     "invalid_action": -2.0,
     "base_factor": 100.0,
@@ -89,8 +110,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = {
     "idle_penalty_power": 1.0,
     "idle_penalty_scale": 1.0,
     # Holding keys (env defaults)
-    "holding_duration_ratio_grace": 1.0,
-    "holding_penalty_scale": 0.3,
+    "holding_penalty_scale": 0.5,
     "holding_penalty_power": 1.0,
     # Exit factor configuration (env defaults)
     "exit_factor_mode": "piecewise",
@@ -112,7 +132,6 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
     "base_factor": "Base reward factor used inside the environment.",
     "idle_penalty_power": "Power applied to idle penalty scaling.",
     "idle_penalty_scale": "Scale of idle penalty.",
-    "holding_duration_ratio_grace": "Grace ratio (<=1) before holding penalty increases with duration ratio.",
     "holding_penalty_scale": "Scale of holding penalty.",
     "holding_penalty_power": "Power applied to holding penalty scaling.",
     "exit_factor_mode": "Time attenuation mode for exit factor.",
@@ -138,7 +157,6 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = {
     "base_factor": {"min": 0.0},
     "idle_penalty_power": {"min": 0.0},
     "idle_penalty_scale": {"min": 0.0},
-    "holding_duration_ratio_grace": {"min": 0.0},
     "holding_penalty_scale": {"min": 0.0},
     "holding_penalty_power": {"min": 0.0},
     "exit_linear_slope": {"min": 0.0},
@@ -291,7 +309,7 @@ def _get_exit_factor(
     elif exit_factor_mode == "sqrt":
         factor /= math.sqrt(1.0 + duration_ratio)
     elif exit_factor_mode == "linear":
-        slope = float(params.get("exit_linear_slope", 1.0))
+        slope = _get_param_float(params, "exit_linear_slope", 1.0)
         if slope < 0.0:
             slope = 1.0
         factor /= 1.0 + slope * duration_ratio
@@ -304,17 +322,17 @@ def _get_exit_factor(
             if isinstance(exit_power_tau, (int, float)):
                 exit_power_tau = float(exit_power_tau)
                 if 0.0 < exit_power_tau <= 1.0:
-                    exit_power_alpha = -math.log(exit_power_tau) / math.log(2.0)
+                    exit_power_alpha = -math.log(exit_power_tau) / _LOG_2
         if not isinstance(exit_power_alpha, (int, float)):
             exit_power_alpha = 1.0
         else:
             exit_power_alpha = float(exit_power_alpha)
         factor /= math.pow(1.0 + duration_ratio, exit_power_alpha)
     elif exit_factor_mode == "piecewise":
-        exit_piecewise_grace = float(params.get("exit_piecewise_grace", 1.0))
+        exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0)
         if not (0.0 <= exit_piecewise_grace <= 1.0):
             exit_piecewise_grace = 1.0
-        exit_piecewise_slope = float(params.get("exit_piecewise_slope", 1.0))
+        exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0)
         if exit_piecewise_slope < 0.0:
             exit_piecewise_slope = 1.0
         if duration_ratio <= exit_piecewise_grace:
@@ -325,7 +343,7 @@ def _get_exit_factor(
             )
         factor /= duration_divisor
     elif exit_factor_mode == "half_life":
-        exit_half_life = float(params.get("exit_half_life", 0.5))
+        exit_half_life = _get_param_float(params, "exit_half_life", 0.5)
         if exit_half_life <= 0.0:
             exit_half_life = 0.5
         factor *= math.pow(2.0, -duration_ratio / exit_half_life)
@@ -398,8 +416,8 @@ def _idle_penalty(
     context: RewardContext, idle_factor: float, params: Dict[str, float | str]
 ) -> float:
     """Mirror the environment's idle penalty behaviour."""
-    idle_penalty_scale = float(params.get("idle_penalty_scale", 1.0))
-    idle_penalty_power = float(params.get("idle_penalty_power", 1.0))
+    idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 1.0)
+    idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.0)
     max_idle_duration = int(
         params.get(
             "max_idle_duration_candles",
@@ -411,44 +429,23 @@ def _idle_penalty(
 
 
 def _holding_penalty(
-    context: RewardContext,
-    holding_factor: float,
-    params: Dict[str, float | str],
-    profit_target: float,
+    context: RewardContext, holding_factor: float, params: Dict[str, float | str]
 ) -> float:
     """Mirror the environment's holding penalty behaviour."""
-    holding_duration_ratio_grace = float(
-        params.get("holding_duration_ratio_grace", 1.0)
+    holding_penalty_scale = _get_param_float(params, "holding_penalty_scale", 0.5)
+    holding_penalty_power = _get_param_float(params, "holding_penalty_power", 1.0)
+    duration_ratio = _compute_duration_ratio(
+        context.trade_duration, context.max_trade_duration
     )
-    if holding_duration_ratio_grace <= 0.0:
-        holding_duration_ratio_grace = 1.0
-    holding_penalty_scale = float(params.get("holding_penalty_scale", 0.3))
-    holding_penalty_power = float(params.get("holding_penalty_power", 1.0))
-    duration_ratio = context.trade_duration / max(1, context.max_trade_duration)
-    pnl = context.pnl
 
-    if pnl >= profit_target:
-        if duration_ratio <= holding_duration_ratio_grace:
-            effective_duration_ratio = duration_ratio / holding_duration_ratio_grace
-        else:
-            effective_duration_ratio = 1.0 + (
-                duration_ratio - holding_duration_ratio_grace
-            )
-        return (
-            -holding_factor
-            * holding_penalty_scale
-            * effective_duration_ratio**holding_penalty_power
-        )
-
-    if duration_ratio > holding_duration_ratio_grace:
-        return (
-            -holding_factor
-            * holding_penalty_scale
-            * (1.0 + (duration_ratio - holding_duration_ratio_grace))
-            ** holding_penalty_power
-        )
+    if duration_ratio < 1.0:
+        return 0.0
 
-    return 0.0
+    return (
+        -holding_factor
+        * holding_penalty_scale
+        * (duration_ratio - 1.0) ** holding_penalty_power
+    )
 
 
 def _compute_exit_reward(
@@ -458,7 +455,9 @@ def _compute_exit_reward(
     params: Dict[str, float | str],
 ) -> float:
     """Compose the exit reward: pnl * exit_factor."""
-    duration_ratio = context.trade_duration / max(1, context.max_trade_duration)
+    duration_ratio = _compute_duration_ratio(
+        context.trade_duration, context.max_trade_duration
+    )
     exit_factor = _get_exit_factor(
         base_factor,
         context.pnl,
@@ -488,11 +487,11 @@ def calculate_reward(
         force_action=context.force_action,
     )
     if not is_valid and not action_masking:
-        breakdown.invalid_penalty = float(params.get("invalid_action", -2.0))
+        breakdown.invalid_penalty = _get_param_float(params, "invalid_action", -2.0)
         breakdown.total = breakdown.invalid_penalty
         return breakdown
 
-    factor = float(params.get("base_factor", base_factor))
+    factor = _get_param_float(params, "base_factor", base_factor)
 
     profit_target_override = params.get("profit_target")
     if isinstance(profit_target_override, (int, float)):
@@ -509,8 +508,7 @@ def calculate_reward(
     profit_target_final = profit_target * risk_reward_ratio
     idle_factor = factor * profit_target_final / 3.0
     pnl_factor = _get_pnl_factor(params, context, profit_target_final)
-    holding_factor = idle_factor * pnl_factor
-    duration_ratio = context.trade_duration / max(1, context.max_trade_duration)
+    holding_factor = idle_factor
 
     if context.force_action in (
         ForceActions.Take_profit,
@@ -536,21 +534,8 @@ def calculate_reward(
         context.position in (Positions.Long, Positions.Short)
         and context.action == Actions.Neutral
     ):
-        holding_duration_ratio_grace = float(
-            params.get("holding_duration_ratio_grace", 1.0)
-        )
-        if holding_duration_ratio_grace <= 0.0:
-            holding_duration_ratio_grace = 1.0
-        if (
-            context.pnl >= profit_target_final
-            or duration_ratio > holding_duration_ratio_grace
-        ):
-            breakdown.holding_penalty = _holding_penalty(
-                context, holding_factor, params, profit_target_final
-            )
-            breakdown.total = breakdown.holding_penalty
-            return breakdown
-        breakdown.total = 0.0
+        breakdown.holding_penalty = _holding_penalty(context, holding_factor, params)
+        breakdown.total = breakdown.holding_penalty
         return breakdown
 
     if context.action == Actions.Long_exit and context.position == Positions.Long:
@@ -1947,7 +1932,7 @@ def build_argument_parser() -> argparse.ArgumentParser:
         nargs="*",
         default=[],
         metavar="KEY=VALUE",
-        help="Override reward parameters, e.g. holding_duration_ratio_grace=1.2",
+        help="Override reward parameters, e.g. holding_penalty_scale=0.5",
     )
     # Dynamically add CLI options for all tunables
     add_tunable_cli_args(parser)
index cbb3ae852dafb614e136c2eaf4413f4a08bd6a54..8ffa10b7f745916e0ae7721a1c587972edb85b79 100644 (file)
@@ -7,14 +7,15 @@ Covers:
 - Reward alignment with environment
 """
 
-import unittest
-import tempfile
+import json
+import pickle
 import shutil
-from pathlib import Path
 import subprocess
 import sys
-import json
-import pickle
+import tempfile
+import unittest
+from pathlib import Path
+
 import numpy as np
 import pandas as pd
 
@@ -27,12 +28,12 @@ try:
         ForceActions,
         Positions,
         RewardContext,
+        bootstrap_confidence_intervals,
         calculate_reward,
         compute_distribution_shift_metrics,
         distribution_diagnostics,
-        simulate_samples,
-        bootstrap_confidence_intervals,
         parse_overrides,
+        simulate_samples,
     )
 except ImportError as e:
     print(f"Import error: {e}")
@@ -394,8 +395,8 @@ class TestRewardAlignment(RewardSpaceTestBase):
             self.assertGreater(factor, 0, f"Exit factor for {mode} should be positive")
 
 
-class TestPublicFunctions(RewardSpaceTestBase):
-    """Test public functions and API."""
+class TestPublicAPI(RewardSpaceTestBase):
+    """Test public API functions and interfaces."""
 
     def test_parse_overrides(self):
         """Test parse_overrides function."""
@@ -446,8 +447,8 @@ class TestPublicFunctions(RewardSpaceTestBase):
     def test_statistical_hypothesis_tests_seed_reproducibility(self):
         """Ensure statistical_hypothesis_tests + bootstrap CIs are reproducible with stats_seed."""
         from reward_space_analysis import (
-            statistical_hypothesis_tests,
             bootstrap_confidence_intervals,
+            statistical_hypothesis_tests,
         )
 
         np.random.seed(123)
@@ -504,6 +505,10 @@ class TestPublicFunctions(RewardSpaceTestBase):
         ci_b = bootstrap_confidence_intervals(df, metrics, n_bootstrap=250, seed=2024)
         self.assertEqual(ci_a, ci_b)
 
+
+class TestStatisticalValidation(RewardSpaceTestBase):
+    """Test statistical validation and mathematical bounds."""
+
     def test_pnl_invariant_validation(self):
         """Test critical PnL invariant: only exit actions should have non-zero PnL."""
         # Generate samples and verify PnL invariant
@@ -663,11 +668,12 @@ class TestPublicFunctions(RewardSpaceTestBase):
     def test_exit_factor_mathematical_formulas(self):
         """Test mathematical correctness of exit factor calculations."""
         import math
+
         from reward_space_analysis import (
-            calculate_reward,
-            RewardContext,
             Actions,
             Positions,
+            RewardContext,
+            calculate_reward,
         )
 
         # Test context with known values
@@ -948,8 +954,8 @@ class TestPublicFunctions(RewardSpaceTestBase):
                 )
 
 
-class TestEdgeCases(RewardSpaceTestBase):
-    """Test edge cases and error conditions."""
+class TestBoundaryConditions(RewardSpaceTestBase):
+    """Test boundary conditions and edge cases."""
 
     def test_extreme_parameter_values(self):
         """Test behavior with extreme parameter values."""
@@ -1026,7 +1032,7 @@ class TestEdgeCases(RewardSpaceTestBase):
                 )
 
 
-class TestUtilityFunctions(RewardSpaceTestBase):
+class TestHelperFunctions(RewardSpaceTestBase):
     """Test utility and helper functions."""
 
     def test_to_bool(self):
@@ -1118,9 +1124,9 @@ class TestUtilityFunctions(RewardSpaceTestBase):
     def test_write_functions(self):
         """Test various write functions."""
         from reward_space_analysis import (
-            write_summary,
             write_relationship_reports,
             write_representativity_report,
+            write_summary,
         )
 
         # Create test data
@@ -1288,7 +1294,7 @@ class TestUtilityFunctions(RewardSpaceTestBase):
             )
 
 
-class TestPrivateFunctionsViaPublicAPI(RewardSpaceTestBase):
+class TestPrivateFunctions(RewardSpaceTestBase):
     """Test private functions through public API calls."""
 
     def test_idle_penalty_via_rewards(self):
@@ -1432,6 +1438,113 @@ class TestPrivateFunctionsViaPublicAPI(RewardSpaceTestBase):
             "Total should equal invalid penalty",
         )
 
+    def test_holding_penalty_zero_before_max_duration(self):
+        """Test holding penalty logic: zero penalty before max_trade_duration."""
+        max_duration = 128
+
+        # Test cases: before, at, and after max_duration
+        test_cases = [
+            (64, "before max_duration"),
+            (127, "just before max_duration"),
+            (128, "exactly at max_duration"),
+            (129, "just after max_duration"),
+            (192, "well after max_duration"),
+        ]
+
+        for trade_duration, description in test_cases:
+            with self.subTest(duration=trade_duration, desc=description):
+                context = RewardContext(
+                    pnl=0.0,  # Neutral PnL to isolate holding penalty
+                    trade_duration=trade_duration,
+                    idle_duration=0,
+                    max_trade_duration=max_duration,
+                    max_unrealized_profit=0.0,
+                    min_unrealized_profit=0.0,
+                    position=Positions.Long,
+                    action=Actions.Neutral,
+                    force_action=None,
+                )
+
+                breakdown = calculate_reward(
+                    context,
+                    self.DEFAULT_PARAMS,
+                    base_factor=100.0,
+                    profit_target=0.03,
+                    risk_reward_ratio=1.0,
+                    short_allowed=True,
+                    action_masking=True,
+                )
+
+                duration_ratio = trade_duration / max_duration
+
+                if duration_ratio < 1.0:
+                    # Before max_duration: should be exactly 0.0
+                    self.assertEqual(
+                        breakdown.holding_penalty,
+                        0.0,
+                        f"Holding penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
+                    )
+                elif duration_ratio == 1.0:
+                    # At max_duration: (1.0-1.0)^power = 0, so should be 0.0
+                    self.assertEqual(
+                        breakdown.holding_penalty,
+                        0.0,
+                        f"Holding penalty should be 0.0 {description} (ratio={duration_ratio:.2f})",
+                    )
+                else:
+                    # After max_duration: should be negative
+                    self.assertLess(
+                        breakdown.holding_penalty,
+                        0.0,
+                        f"Holding penalty should be negative {description} (ratio={duration_ratio:.2f})",
+                    )
+
+                # Total should equal holding penalty (no other components active)
+                self.assertEqual(
+                    breakdown.total,
+                    breakdown.holding_penalty,
+                    f"Total should equal holding penalty {description}",
+                )
+
+    def test_holding_penalty_progressive_scaling(self):
+        """Test that holding penalty scales progressively after max_duration."""
+        max_duration = 100
+        durations = [150, 200, 300]  # All > max_duration
+        penalties = []
+
+        for duration in durations:
+            context = RewardContext(
+                pnl=0.0,
+                trade_duration=duration,
+                idle_duration=0,
+                max_trade_duration=max_duration,
+                max_unrealized_profit=0.0,
+                min_unrealized_profit=0.0,
+                position=Positions.Long,
+                action=Actions.Neutral,
+                force_action=None,
+            )
+
+            breakdown = calculate_reward(
+                context,
+                self.DEFAULT_PARAMS,
+                base_factor=100.0,
+                profit_target=0.03,
+                risk_reward_ratio=1.0,
+                short_allowed=True,
+                action_masking=True,
+            )
+
+            penalties.append(breakdown.holding_penalty)
+
+        # Penalties should be increasingly negative (monotonic decrease)
+        for i in range(1, len(penalties)):
+            self.assertLessEqual(
+                penalties[i],
+                penalties[i - 1],
+                f"Penalty should increase with duration: {penalties[i]} > {penalties[i-1]}",
+            )
+
 
 if __name__ == "__main__":
     # Configure test discovery and execution
index 980c1313691fc5e7ede9d155a0ac43e64571f1af..f7cdf1c9ec71aae837089051d19ca607bd5ad4c3 100644 (file)
@@ -1520,7 +1520,7 @@ class MyRLEnv(Base5ActionRLEnv):
         base_factor = float(model_reward_parameters.get("base_factor", 100.0))
         pnl_target = self.profit_aim * self.rr
         idle_factor = base_factor * pnl_target / 3.0
-        holding_factor = idle_factor * self._get_pnl_factor(pnl, pnl_target)
+        holding_factor = idle_factor
 
         # Force exits
         if self._force_action in (
@@ -1576,39 +1576,21 @@ class MyRLEnv(Base5ActionRLEnv):
             self._position in (Positions.Short, Positions.Long)
             and action == Actions.Neutral.value
         ):
-            holding_duration_ratio_grace = float(
-                model_reward_parameters.get("holding_duration_ratio_grace", 1.0)
-            )
-            if holding_duration_ratio_grace <= 0.0:
-                holding_duration_ratio_grace = 1.0
             holding_penalty_scale = float(
-                model_reward_parameters.get("holding_penalty_scale", 0.3)
+                model_reward_parameters.get("holding_penalty_scale", 0.5)
             )
             holding_penalty_power = float(
                 model_reward_parameters.get("holding_penalty_power", 1.0)
             )
-            if pnl >= pnl_target:
-                if duration_ratio <= holding_duration_ratio_grace:
-                    effective_duration_ratio = (
-                        duration_ratio / holding_duration_ratio_grace
-                    )
-                else:
-                    effective_duration_ratio = 1.0 + (
-                        duration_ratio - holding_duration_ratio_grace
-                    )
-                return (
-                    -holding_factor
-                    * holding_penalty_scale
-                    * effective_duration_ratio**holding_penalty_power
-                )
-            if duration_ratio > holding_duration_ratio_grace:
-                return (
-                    -holding_factor
-                    * holding_penalty_scale
-                    * (1.0 + (duration_ratio - holding_duration_ratio_grace))
-                    ** holding_penalty_power
-                )
-            return 0.0
+
+            if duration_ratio < 1.0:
+                return 0.0
+
+            return (
+                -holding_factor
+                * holding_penalty_scale
+                * (duration_ratio - 1.0) ** holding_penalty_power
+            )
 
         # close long
         if action == Actions.Long_exit.value and self._position == Positions.Long: