]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(reward): centralize piecewise divisor, improve idle fallback, record param...
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 Oct 2025 12:27:49 +0000 (14:27 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 Oct 2025 12:27:49 +0000 (14:27 +0200)
ReforceXY/reward_space_analysis/reward_space_analysis.py

index fa256e4252db7551ca606aa0db2beaedb79afa99..9c698fb56394eea363e18585c5faabdd98de867e 100644 (file)
@@ -23,6 +23,7 @@ import dataclasses
 import math
 import pickle
 import random
+import warnings
 from enum import Enum, IntEnum
 from pathlib import Path
 from typing import Any, Dict, Iterable, List, Optional, Tuple
@@ -86,6 +87,25 @@ def _get_param_float(params: Dict[str, float | str], key: str, default: float) -
     return default
 
 
+def _piecewise_duration_divisor(
+    duration_ratio: float, params: Dict[str, float | str]
+) -> float:
+    """Compute divisor for piecewise attenuation (single source of truth).
+
+    Ensures consistent fallback behaviour across the primary code path and
+    exception fallback in ``_get_exit_factor`` without duplicating logic.
+    """
+    exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0)
+    if not (0.0 <= exit_piecewise_grace <= 1.0):  # sanitize grace range
+        exit_piecewise_grace = 1.0
+    exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0)
+    if exit_piecewise_slope < 0.0:  # sanitize slope sign
+        exit_piecewise_slope = 1.0
+    if duration_ratio <= exit_piecewise_grace:
+        return 1.0
+    return 1.0 + exit_piecewise_slope * (duration_ratio - exit_piecewise_grace)
+
+
 def _compute_duration_ratio(trade_duration: int, max_trade_duration: int) -> float:
     """Compute duration ratio with safe division."""
     return trade_duration / max(1, max_trade_duration)
@@ -109,6 +129,8 @@ DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = {
     # Idle penalty (env defaults)
     "idle_penalty_power": 1.0,
     "idle_penalty_scale": 1.0,
+    # If <=0 or unset, falls back to max_trade_duration_candles at runtime
+    "max_idle_duration_candles": 0,
     # Holding keys (env defaults)
     "holding_penalty_scale": 0.5,
     "holding_penalty_power": 1.0,
@@ -125,6 +147,9 @@ DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = {
     # Profit factor params (env defaults)
     "win_reward_factor": 2.0,
     "pnl_factor_beta": 0.5,
+    # Invariant / safety controls (env defaults)
+    "check_invariants": True,
+    "exit_factor_threshold": 10000.0,
 }
 
 DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
@@ -132,6 +157,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
     "base_factor": "Base reward factor used inside the environment.",
     "idle_penalty_power": "Power applied to idle penalty scaling.",
     "idle_penalty_scale": "Scale of idle penalty.",
+    "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling; 0 = use max_trade_duration_candles.",
     "holding_penalty_scale": "Scale of holding penalty.",
     "holding_penalty_power": "Power applied to holding penalty scaling.",
     "exit_factor_mode": "Time attenuation mode for exit factor.",
@@ -144,6 +170,8 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
     "efficiency_center": "Center for efficiency factor sigmoid.",
     "win_reward_factor": "Amplification for pnl above target.",
     "pnl_factor_beta": "Sensitivity of amplification around target.",
+    "check_invariants": "Boolean flag (true/false) to enable runtime invariant & safety checks.",
+    "exit_factor_threshold": "If |exit factor| exceeds this threshold, emit warning.",
 }
 
 
@@ -157,6 +185,7 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = {
     "base_factor": {"min": 0.0},
     "idle_penalty_power": {"min": 0.0},
     "idle_penalty_scale": {"min": 0.0},
+    "max_idle_duration_candles": {"min": 0.0},
     "holding_penalty_scale": {"min": 0.0},
     "holding_penalty_power": {"min": 0.0},
     "exit_linear_slope": {"min": 0.0},
@@ -208,10 +237,8 @@ def validate_reward_parameters(
             adjustments[key] = {
                 "original": original,
                 "adjusted": adjusted,
-                "reason": float("nan"),  # placeholder for JSON compatibility
+                "reason": ",".join(reason_parts),  # textual reason directly
             }
-            # store textual reason separately to avoid type mixing
-            adjustments[key]["_reason_text"] = ",".join(reason_parts)
     return sanitized, adjustments
 
 
@@ -278,20 +305,43 @@ def _get_exit_factor(
     duration_ratio: float,
     params: Dict[str, float | str],
 ) -> float:
-    """Compute the complete exit factor including time-based attenuation and PnL factor.
-
-    This mirrors ReforceXY._get_exit_factor() exactly:
-    1. Apply time-based attenuation to base factor
-    2. Multiply by pnl_factor at the end
-
-    Exit factor mode formulas (explicit clarification for auditability):
-    - legacy: factor *= 1.5 if duration_ratio <= 1 else *= 0.5
-    - sqrt:   factor /= sqrt(1 + duration_ratio)
-    - linear: factor /= (1 + slope * duration_ratio)
-    - power:  alpha = -log(tau)/log(2) with tau in (0,1]; factor /= (1 + duration_ratio)^alpha
-    - piecewise: grace region g in [0,1]; factor /= 1 if d<=g else (1 + slope*(d-g))
-    - half_life: factor *= 2^(-duration_ratio/half_life)
+    """Compute the complete exit factor (time attenuation + PnL scaling).
+
+    Synchronization
+    ---------------
+    Mirrors the environment implementation `ReforceXY._get_exit_factor` (parity date: 2025-10-06).
+    Any upstream change MUST be ported here to keep analytical results consistent with live behavior.
+
+    Processing steps
+    ----------------
+    1. Clamp negative ``duration_ratio`` to 0.
+    2. Apply time-based attenuation according to ``exit_factor_mode``.
+    3. Multiply by ``pnl_factor`` (already includes profit amplification & efficiency component).
+    4. Enforce invariants (finite; non-negative when pnl>=0) and optionally emit a warning if
+       ``|factor| > exit_factor_threshold`` (warning-only, no capping).
+
+    Modes (attenuation formulas)
+    ----------------------------
+    legacy    : factor *= 1.5 if r <= 1 else *= 0.5 (step change)
+    sqrt      : factor /= sqrt(1 + r)
+    linear    : factor /= (1 + slope * r)
+    power     : alpha = -log(tau)/log(2) with tau in (0,1]; factor /= (1 + r)^alpha (fallback alpha=1)
+    piecewise : grace g in [0,1]; if r <= g then divisor=1 else divisor = 1 + slope * (r - g)
+    half_life : factor *= 2^(-r / half_life)
+
+    Fallback: Unknown modes default to piecewise.
+
+    Invariants
+    ----------
+    - Factor set to 0 if non-finite.
+    - Factor clamped to >=0 when pnl >= 0.
+    - Threshold exceedance triggers RuntimeWarning only.
+
+    Returns
+    -------
+    float : attenuated & pnl-scaled factor (reward exit component = pnl * factor).
     """
+    # Basic finiteness checks
     if (
         not math.isfinite(factor)
         or not math.isfinite(pnl)
@@ -299,57 +349,76 @@ def _get_exit_factor(
     ):
         return 0.0
 
-    exit_factor_mode = str(params.get("exit_factor_mode", "piecewise")).lower()
+    # Guard: duration ratio should never be negative
+    if duration_ratio < 0.0:
+        duration_ratio = 0.0
 
-    if exit_factor_mode == "legacy":
-        if duration_ratio <= 1.0:
-            factor *= 1.5
-        else:
-            factor *= 0.5
-    elif exit_factor_mode == "sqrt":
-        factor /= math.sqrt(1.0 + duration_ratio)
-    elif exit_factor_mode == "linear":
-        slope = _get_param_float(params, "exit_linear_slope", 1.0)
-        if slope < 0.0:
-            slope = 1.0
-        factor /= 1.0 + slope * duration_ratio
-    elif exit_factor_mode == "power":
-        exit_power_alpha = params.get("exit_power_alpha")
-        if isinstance(exit_power_alpha, (int, float)) and exit_power_alpha < 0.0:
-            exit_power_alpha = None
-        if exit_power_alpha is None:
-            exit_power_tau = params.get("exit_power_tau")
-            if isinstance(exit_power_tau, (int, float)):
-                exit_power_tau = float(exit_power_tau)
-                if 0.0 < exit_power_tau <= 1.0:
-                    exit_power_alpha = -math.log(exit_power_tau) / _LOG_2
-        if not isinstance(exit_power_alpha, (int, float)):
-            exit_power_alpha = 1.0
-        else:
-            exit_power_alpha = float(exit_power_alpha)
-        factor /= math.pow(1.0 + duration_ratio, exit_power_alpha)
-    elif exit_factor_mode == "piecewise":
-        exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0)
-        if not (0.0 <= exit_piecewise_grace <= 1.0):
-            exit_piecewise_grace = 1.0
-        exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0)
-        if exit_piecewise_slope < 0.0:
-            exit_piecewise_slope = 1.0
-        if duration_ratio <= exit_piecewise_grace:
-            duration_divisor = 1.0
-        else:
-            duration_divisor = 1.0 + exit_piecewise_slope * (
-                duration_ratio - exit_piecewise_grace
-            )
-        factor /= duration_divisor
-    elif exit_factor_mode == "half_life":
-        exit_half_life = _get_param_float(params, "exit_half_life", 0.5)
-        if exit_half_life <= 0.0:
-            exit_half_life = 0.5
-        factor *= math.pow(2.0, -duration_ratio / exit_half_life)
+    exit_factor_mode = str(params.get("exit_factor_mode", "piecewise")).lower()
 
+    try:
+        if exit_factor_mode == "legacy":
+            factor *= 1.5 if duration_ratio <= 1.0 else 0.5
+        elif exit_factor_mode == "sqrt":
+            factor /= math.sqrt(1.0 + duration_ratio)
+        elif exit_factor_mode == "linear":
+            slope = _get_param_float(params, "exit_linear_slope", 1.0)
+            if slope < 0.0:
+                slope = 1.0
+            factor /= 1.0 + slope * duration_ratio
+        elif exit_factor_mode == "power":
+            exit_power_alpha = params.get("exit_power_alpha")
+            if isinstance(exit_power_alpha, (int, float)) and exit_power_alpha < 0.0:
+                exit_power_alpha = None
+            if exit_power_alpha is None:
+                exit_power_tau = params.get("exit_power_tau")
+                if isinstance(exit_power_tau, (int, float)):
+                    exit_power_tau = float(exit_power_tau)
+                    if 0.0 < exit_power_tau <= 1.0:
+                        exit_power_alpha = -math.log(exit_power_tau) / _LOG_2
+            if not isinstance(exit_power_alpha, (int, float)):
+                exit_power_alpha = 1.0
+            else:
+                exit_power_alpha = float(exit_power_alpha)
+            factor /= math.pow(1.0 + duration_ratio, exit_power_alpha)
+        elif exit_factor_mode == "piecewise" or exit_factor_mode not in {
+            "legacy",
+            "sqrt",
+            "linear",
+            "power",
+            "half_life",
+        }:
+            # Default & fallback behaviour consolidated
+            factor /= _piecewise_duration_divisor(duration_ratio, params)
+        elif exit_factor_mode == "half_life":
+            exit_half_life = _get_param_float(params, "exit_half_life", 0.5)
+            if exit_half_life <= 0.0:
+                exit_half_life = 0.5
+            factor *= math.pow(2.0, -duration_ratio / exit_half_life)
+    except Exception:
+        # Safe fallback to piecewise logic if any unexpected error arises (centralized)
+        factor /= _piecewise_duration_divisor(duration_ratio, params)
+
+    # Apply pnl_factor after time attenuation
     factor *= pnl_factor
 
+    # Invariant & safety checks
+    if _to_bool(params.get("check_invariants", True)):
+        if not math.isfinite(factor):
+            return 0.0
+        if factor < 0.0 and pnl >= 0.0:
+            # Clamp: avoid negative amplification on non-negative pnl
+            factor = 0.0
+        thr = params.get("exit_factor_threshold")
+        if isinstance(thr, (int, float)) and thr > 0 and math.isfinite(thr):
+            if abs(factor) > thr:
+                warnings.warn(
+                    (
+                        f"_get_exit_factor |factor|={abs(factor):.2f} exceeds threshold {thr:.2f}"
+                    ),
+                    RuntimeWarning,
+                    stacklevel=2,
+                )
+
     return factor
 
 
@@ -418,12 +487,16 @@ def _idle_penalty(
     """Mirror the environment's idle penalty behaviour."""
     idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 1.0)
     idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.0)
-    max_idle_duration = int(
+    max_idle_duration_cfg = int(
         params.get(
-            "max_idle_duration_candles",
-            max(1, context.max_trade_duration),
+            "max_idle_duration_candles", params.get("max_trade_duration_candles", 0)
         )
     )
+    # Fallback: align with documented intent -> use context.max_trade_duration when cfg <= 0
+    if max_idle_duration_cfg <= 0:
+        max_idle_duration = context.max_trade_duration
+    else:
+        max_idle_duration = max_idle_duration_cfg
     idle_duration_ratio = context.idle_duration / max(1, max_idle_duration)
     return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power
 
@@ -459,15 +532,22 @@ def _compute_exit_reward(
         context.trade_duration, context.max_trade_duration
     )
     exit_factor = _get_exit_factor(
-        base_factor,
-        context.pnl,
-        pnl_factor,
-        duration_ratio,
-        params,
+        base_factor, context.pnl, pnl_factor, duration_ratio, params
     )
     return context.pnl * exit_factor
 
 
+def compute_exit_factor(
+    base_factor: float,
+    pnl: float,
+    pnl_factor: float,
+    duration_ratio: float,
+    params: Dict[str, float | str],
+) -> float:
+    """Public wrapper to compute the time-attenuated + pnl-scaled exit factor."""
+    return _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+
+
 def calculate_reward(
     context: RewardContext,
     params: Dict[str, float | str],