]> Piment Noir Git Repositories - freqai-strategies.git/commitdiff
refactor(ReforceXY): simplify param helpers with auto-lookup defaults
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 25 Dec 2025 01:09:50 +0000 (02:09 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 25 Dec 2025 01:09:50 +0000 (02:09 +0100)
ReforceXY/reward_space_analysis/reward_space_analysis.py

index eee595dc57095ab392dc235b603196598693eb83..09561c3dabbbf44fb3ef5b7d19f87883d53ee5e2 100644 (file)
@@ -305,13 +305,24 @@ def _to_bool(value: Any) -> bool:
     raise ValueError(f"Unrecognized boolean literal: {value!r}")
 
 
-def _get_bool_param(params: RewardParams, key: str, default: bool) -> bool:
-    """Extract boolean parameter with type safety."""
+def _get_bool_param(params: RewardParams, key: str, default: Optional[bool] = None) -> bool:
+    """Extract boolean parameter with type safety.
+
+    Args:
+        params: Parameter dictionary to extract from.
+        key: Parameter key to look up.
+        default: Fallback value. If None, looks up from DEFAULT_MODEL_REWARD_PARAMETERS.
+
+    Returns:
+        Boolean value with fallback chain: params[key] -> default -> canonical default.
+    """
+    if default is None:
+        default = DEFAULT_MODEL_REWARD_PARAMETERS.get(key)
     value = params.get(key, default)
     try:
         return _to_bool(value)
     except Exception:
-        return bool(default)
+        return bool(default) if default is not None else False
 
 
 def _resolve_additive_enablement(
@@ -340,8 +351,21 @@ def _resolve_additive_enablement(
     return entry_additive_effective, exit_additive_effective, additives_suppressed
 
 
-def _get_float_param(params: RewardParams, key: str, default: RewardParamValue) -> float:
-    """Extract float parameter with type safety and default fallback."""
+def _get_float_param(
+    params: RewardParams, key: str, default: Optional[RewardParamValue] = None
+) -> float:
+    """Extract float parameter with type safety and default fallback.
+
+    Args:
+        params: Parameter dictionary to extract from.
+        key: Parameter key to look up.
+        default: Fallback value. If None, looks up from DEFAULT_MODEL_REWARD_PARAMETERS.
+
+    Returns:
+        Float value with fallback chain: params[key] -> default -> canonical default.
+    """
+    if default is None:
+        default = DEFAULT_MODEL_REWARD_PARAMETERS.get(key)
     value = params.get(key, default)
     # None -> NaN
     if value is None:
@@ -417,9 +441,16 @@ def _clamp_float_to_bounds(
     return adjusted, reason_parts
 
 
-def _get_int_param(params: RewardParams, key: str, default: RewardParamValue) -> int:
+def _get_int_param(
+    params: RewardParams, key: str, default: Optional[RewardParamValue] = None
+) -> int:
     """Extract integer parameter with robust coercion.
 
+    Args:
+        params: Parameter dictionary to extract from.
+        key: Parameter key to look up.
+        default: Fallback value. If None, looks up from DEFAULT_MODEL_REWARD_PARAMETERS.
+
     Behavior:
     - Accept bool/int/float/str numeric representations.
     - Non-finite floats -> fallback to default coerced to int (or 0).
@@ -427,6 +458,8 @@ def _get_int_param(params: RewardParams, key: str, default: RewardParamValue) ->
     - None -> fallback.
     - Final value is clamped to a signed 64-bit range implicitly by int().
     """
+    if default is None:
+        default = DEFAULT_MODEL_REWARD_PARAMETERS.get(key)
     value = params.get(key, default)
     if value is None:
         return int(default) if isinstance(default, (int, float)) else 0
@@ -458,8 +491,20 @@ def _get_int_param(params: RewardParams, key: str, default: RewardParamValue) ->
     return int(default) if isinstance(default, (int, float)) else 0
 
 
-def _get_str_param(params: RewardParams, key: str, default: str) -> str:
-    """Extract string parameter with type safety."""
+def _get_str_param(params: RewardParams, key: str, default: Optional[str] = None) -> str:
+    """Extract string parameter with type safety and default fallback.
+
+    Args:
+        params: Parameter dictionary to extract from.
+        key: Parameter key to look up.
+        default: Fallback value. If None, looks up from DEFAULT_MODEL_REWARD_PARAMETERS.
+
+    Returns:
+        String value with fallback chain: params[key] -> default -> canonical default.
+    """
+    if default is None:
+        default_val = DEFAULT_MODEL_REWARD_PARAMETERS.get(key)
+        default = str(default_val) if default_val is not None else ""
     value = params.get(key, default)
     if isinstance(value, str):
         return value
@@ -497,11 +542,7 @@ def get_max_idle_duration_candles(
         else None
     )
     if mtd is None or mtd <= 0:
-        mtd = _get_int_param(
-            params,
-            "max_trade_duration_candles",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
-        )
+        mtd = _get_int_param(params, "max_trade_duration_candles")
         if mtd <= 0:
             mtd = int(DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128))
 
@@ -743,22 +784,10 @@ def _compute_time_attenuation_coefficient(
     if duration_ratio < 0.0:
         duration_ratio = 0.0
 
-    exit_attenuation_mode = _get_str_param(
-        params,
-        "exit_attenuation_mode",
-        str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_attenuation_mode", "linear")),
-    )
-    exit_plateau = _get_bool_param(
-        params,
-        "exit_plateau",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_plateau", True)),
-    )
+    exit_attenuation_mode = _get_str_param(params, "exit_attenuation_mode")
+    exit_plateau = _get_bool_param(params, "exit_plateau")
 
-    exit_plateau_grace = _get_float_param(
-        params,
-        "exit_plateau_grace",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_plateau_grace", 1.0),
-    )
+    exit_plateau_grace = _get_float_param(params, "exit_plateau_grace")
     if exit_plateau_grace < 0.0:
         warnings.warn(
             "exit_plateau_grace < 0; falling back to 0.0",
@@ -766,11 +795,7 @@ def _compute_time_attenuation_coefficient(
             stacklevel=2,
         )
         exit_plateau_grace = 0.0
-    exit_linear_slope = _get_float_param(
-        params,
-        "exit_linear_slope",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_linear_slope", 1.0),
-    )
+    exit_linear_slope = _get_float_param(params, "exit_linear_slope")
     if exit_linear_slope < 0.0:
         warnings.warn(
             "exit_linear_slope < 0; falling back to 1.0",
@@ -806,11 +831,7 @@ def _compute_time_attenuation_coefficient(
         return 1.0 / math.pow(1.0 + dr, alpha)
 
     def _half_life_kernel(dr: float) -> float:
-        hl = _get_float_param(
-            params,
-            "exit_half_life",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_half_life", 0.5),
-        )
+        hl = _get_float_param(params, "exit_half_life")
         if np.isclose(hl, 0.0):
             warnings.warn(
                 f"exit_half_life={hl} close to 0; falling back to 1.0",
@@ -915,17 +936,12 @@ def _get_exit_factor(
     if _get_bool_param(
         params,
         "check_invariants",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("check_invariants", True)),
     ):
         if not np.isfinite(exit_factor):
             return _fail_safely("non_finite_exit_factor_after_kernel")
         if exit_factor < 0.0 and pnl >= 0.0:
             exit_factor = 0.0
-        exit_factor_threshold = _get_float_param(
-            params,
-            "exit_factor_threshold",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_factor_threshold", 1000.0),
-        )
+        exit_factor_threshold = _get_float_param(params, "exit_factor_threshold")
         if exit_factor_threshold > 0 and np.isfinite(exit_factor_threshold):
             if abs(exit_factor) > exit_factor_threshold:
                 warnings.warn(
@@ -964,16 +980,8 @@ def _compute_pnl_target_coefficient(
     pnl_target_coefficient = 1.0
 
     if pnl_target > 0.0:
-        win_reward_factor = _get_float_param(
-            params,
-            "win_reward_factor",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("win_reward_factor", 2.0),
-        )
-        pnl_factor_beta = _get_float_param(
-            params,
-            "pnl_factor_beta",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
-        )
+        win_reward_factor = _get_float_param(params, "win_reward_factor")
+        pnl_factor_beta = _get_float_param(params, "pnl_factor_beta")
         rr = risk_reward_ratio if risk_reward_ratio > 0 else 1.0
 
         pnl_ratio = pnl / pnl_target
@@ -1011,16 +1019,8 @@ def _compute_efficiency_coefficient(
         float: Coefficient ≥ 0.0 (typically 0.5-1.5 range)
     """
     efficiency_coefficient = 1.0
-    efficiency_weight = _get_float_param(
-        params,
-        "efficiency_weight",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_weight", 1.0),
-    )
-    efficiency_center = _get_float_param(
-        params,
-        "efficiency_center",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("efficiency_center", 0.5),
-    )
+    efficiency_weight = _get_float_param(params, "efficiency_weight")
+    efficiency_center = _get_float_param(params, "efficiency_center")
     if efficiency_weight != 0.0 and not np.isclose(pnl, 0.0):
         max_pnl = max(context.max_unrealized_profit, pnl)
         min_pnl = min(context.min_unrealized_profit, pnl)
@@ -1040,11 +1040,7 @@ def _compute_efficiency_coefficient(
         efficiency_coefficient = 0.0
 
     if efficiency_coefficient < 0.0:
-        if _get_bool_param(
-            params,
-            "check_invariants",
-            bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("check_invariants", True)),
-        ):
+        if _get_bool_param(params, "check_invariants"):
             warnings.warn(
                 f"efficiency_coefficient={efficiency_coefficient:.6f} < 0; clamping to 0.0",
                 RewardDiagnosticsWarning,
@@ -1095,16 +1091,8 @@ def _get_next_position(
 
 def _idle_penalty(context: RewardContext, idle_factor: float, params: RewardParams) -> float:
     """Compute idle penalty."""
-    idle_penalty_ratio = _get_float_param(
-        params,
-        "idle_penalty_ratio",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("idle_penalty_ratio", 1.0),
-    )
-    idle_penalty_power = _get_float_param(
-        params,
-        "idle_penalty_power",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("idle_penalty_power", 1.025),
-    )
+    idle_penalty_ratio = _get_float_param(params, "idle_penalty_ratio")
+    idle_penalty_power = _get_float_param(params, "idle_penalty_power")
     max_idle_duration_candles = get_max_idle_duration_candles(params)
     idle_duration_ratio = context.idle_duration / max(1, max_idle_duration_candles)
     return -idle_factor * idle_penalty_ratio * idle_duration_ratio**idle_penalty_power
@@ -1112,21 +1100,9 @@ def _idle_penalty(context: RewardContext, idle_factor: float, params: RewardPara
 
 def _hold_penalty(context: RewardContext, hold_factor: float, params: RewardParams) -> float:
     """Compute hold penalty."""
-    hold_penalty_ratio = _get_float_param(
-        params,
-        "hold_penalty_ratio",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_penalty_ratio", 1.0),
-    )
-    hold_penalty_power = _get_float_param(
-        params,
-        "hold_penalty_power",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_penalty_power", 1.025),
-    )
-    max_trade_duration_candles = _get_int_param(
-        params,
-        "max_trade_duration_candles",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
-    )
+    hold_penalty_ratio = _get_float_param(params, "hold_penalty_ratio")
+    hold_penalty_power = _get_float_param(params, "hold_penalty_power")
+    max_trade_duration_candles = _get_int_param(params, "max_trade_duration_candles")
     duration_ratio = _compute_duration_ratio(context.trade_duration, max_trade_duration_candles)
 
     if duration_ratio < 1.0:
@@ -1183,11 +1159,7 @@ def calculate_reward(
 
     base_reward: Optional[float] = None
     if not is_valid and not action_masking:
-        breakdown.invalid_penalty = _get_float_param(
-            params,
-            "invalid_action",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("invalid_action", -2.0),
-        )
+        breakdown.invalid_penalty = _get_float_param(params, "invalid_action")
         base_reward = breakdown.invalid_penalty
 
     base_factor = _get_float_param(params, "base_factor", base_factor)
@@ -1205,11 +1177,7 @@ def calculate_reward(
     idle_factor = base_factor * (profit_aim / risk_reward_ratio)
     hold_factor = idle_factor
 
-    max_trade_duration_candles = _get_int_param(
-        params,
-        "max_trade_duration_candles",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
-    )
+    max_trade_duration_candles = _get_int_param(params, "max_trade_duration_candles")
     current_duration_ratio = _compute_duration_ratio(
         context.trade_duration, max_trade_duration_candles
     )
@@ -1295,11 +1263,7 @@ def calculate_reward(
             center_unrealized = 0.5 * (
                 context.max_unrealized_profit + context.min_unrealized_profit
             )
-            beta = _get_float_param(
-                params,
-                "pnl_factor_beta",
-                DEFAULT_MODEL_REWARD_PARAMETERS.get("pnl_factor_beta", 0.5),
-            )
+            beta = _get_float_param(params, "pnl_factor_beta")
             next_pnl = float(center_unrealized * math.tanh(beta * next_duration_ratio))
         else:
             next_pnl = current_pnl
@@ -1311,34 +1275,14 @@ def calculate_reward(
         next_duration_ratio = current_duration_ratio
 
     # Apply PBRS only if enabled and not neutral self-loop
-    exit_mode = _get_str_param(
-        params,
-        "exit_potential_mode",
-        str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
-    )
+    exit_mode = _get_str_param(params, "exit_potential_mode")
 
-    hold_potential_enabled = _get_bool_param(
-        params,
-        "hold_potential_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
-    )
+    hold_potential_enabled = _get_bool_param(params, "hold_potential_enabled")
     entry_additive_enabled = (
-        False
-        if exit_mode == "canonical"
-        else _get_bool_param(
-            params,
-            "entry_additive_enabled",
-            bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
-        )
+        False if exit_mode == "canonical" else _get_bool_param(params, "entry_additive_enabled")
     )
     exit_additive_enabled = (
-        False
-        if exit_mode == "canonical"
-        else _get_bool_param(
-            params,
-            "exit_additive_enabled",
-            bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
-        )
+        False if exit_mode == "canonical" else _get_bool_param(params, "exit_additive_enabled")
     )
 
     pbrs_enabled = bool(hold_potential_enabled or entry_additive_enabled or exit_additive_enabled)
@@ -1540,30 +1484,14 @@ def simulate_samples(
     """
 
     rng = random.Random(seed)
-    max_trade_duration_candles = _get_int_param(
-        params,
-        "max_trade_duration_candles",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
-    )
+    max_trade_duration_candles = _get_int_param(params, "max_trade_duration_candles")
     short_allowed = _is_short_allowed(trading_mode)
     action_masking = _get_bool_param(params, "action_masking", True)
 
     # Theoretical PBRS invariance flag
-    exit_mode = _get_str_param(
-        params,
-        "exit_potential_mode",
-        str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
-    )
-    entry_enabled_raw = _get_bool_param(
-        params,
-        "entry_additive_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
-    )
-    exit_enabled_raw = _get_bool_param(
-        params,
-        "exit_additive_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
-    )
+    exit_mode = _get_str_param(params, "exit_potential_mode")
+    entry_enabled_raw = _get_bool_param(params, "entry_additive_enabled")
+    exit_enabled_raw = _get_bool_param(params, "exit_additive_enabled")
 
     entry_enabled, exit_enabled, _additives_suppressed = _resolve_additive_enablement(
         exit_mode,
@@ -1908,11 +1836,7 @@ def _compute_relationship_stats(df: pd.DataFrame) -> Dict[str, Any]:
         if isinstance(df.attrs.get("reward_params"), dict)
         else {}
     )
-    max_trade_duration_candles = _get_int_param(
-        reward_params,
-        "max_trade_duration_candles",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
-    )
+    max_trade_duration_candles = _get_int_param(reward_params, "max_trade_duration_candles")
     idle_bins = np.linspace(0, max_trade_duration_candles * 3.0, 13)
     trade_bins = np.linspace(0, max_trade_duration_candles * 3.0, 13)
     pnl_min = float(df["pnl"].min())
@@ -3034,16 +2958,8 @@ def _get_fee_rates(params: RewardParams) -> tuple[float, float]:
     pre-run `validate_reward_parameters()`.
     """
 
-    raw_entry_fee_rate = _get_float_param(
-        params,
-        "entry_fee_rate",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_fee_rate", 0.0),
-    )
-    raw_exit_fee_rate = _get_float_param(
-        params,
-        "exit_fee_rate",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_fee_rate", 0.0),
-    )
+    raw_entry_fee_rate = _get_float_param(params, "entry_fee_rate")
+    raw_exit_fee_rate = _get_float_param(params, "exit_fee_rate")
 
     entry_fee_rate, _ = _clamp_float_to_bounds(
         "entry_fee_rate",
@@ -3137,11 +3053,7 @@ def _compute_hold_potential(
     base_factor: float,
 ) -> float:
     """Compute PBRS hold potential Φ(s)."""
-    if not _get_bool_param(
-        params,
-        "hold_potential_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
-    ):
+    if not _get_bool_param(params, "hold_potential_enabled"):
         return _fail_safely("hold_potential_disabled")
 
     return _compute_bi_component(
@@ -3167,11 +3079,7 @@ def _compute_entry_additive(
     params: RewardParams,
     base_factor: float,
 ) -> float:
-    if not _get_bool_param(
-        params,
-        "entry_additive_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("entry_additive_enabled", False)),
-    ):
+    if not _get_bool_param(params, "entry_additive_enabled"):
         return _fail_safely("entry_additive_disabled")
     return _compute_bi_component(
         kind="entry_additive",
@@ -3195,11 +3103,7 @@ def _compute_exit_additive(
     params: RewardParams,
     base_factor: float,
 ) -> float:
-    if not _get_bool_param(
-        params,
-        "exit_additive_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_additive_enabled", False)),
-    ):
+    if not _get_bool_param(params, "exit_additive_enabled"):
         return _fail_safely("exit_additive_disabled")
     return _compute_bi_component(
         kind="exit_additive",
@@ -3218,20 +3122,12 @@ def _compute_exit_additive(
 
 def _compute_exit_potential(prev_potential: float, params: RewardParams) -> float:
     """Exit potential per mode (canonical/non_canonical -> 0; others transform Φ(prev))."""
-    mode = _get_str_param(
-        params,
-        "exit_potential_mode",
-        str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
-    )
+    mode = _get_str_param(params, "exit_potential_mode")
     if mode == "canonical" or mode == "non_canonical":
         return _fail_safely("canonical_exit_potential")
 
     if mode == "progressive_release":
-        decay = _get_float_param(
-            params,
-            "exit_potential_decay",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_decay", 0.5),
-        )
+        decay = _get_float_param(params, "exit_potential_decay")
         if not np.isfinite(decay) or decay < 0.0:
             warnings.warn(
                 "exit_potential_decay invalid or < 0; falling back to 0.0",
@@ -3312,18 +3208,10 @@ def compute_pbrs_components(
 
     prev_potential = float(prev_potential) if np.isfinite(prev_potential) else 0.0
 
-    exit_mode = _get_str_param(
-        params,
-        "exit_potential_mode",
-        str(DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical")),
-    )
+    exit_mode = _get_str_param(params, "exit_potential_mode")
     canonical_mode = exit_mode == "canonical"
 
-    hold_potential_enabled = _get_bool_param(
-        params,
-        "hold_potential_enabled",
-        bool(DEFAULT_MODEL_REWARD_PARAMETERS.get("hold_potential_enabled", True)),
-    )
+    hold_potential_enabled = _get_bool_param(params, "hold_potential_enabled")
 
     if is_exit:
         next_potential = _compute_exit_potential(prev_potential, params)
@@ -3662,11 +3550,7 @@ def write_complete_statistical_analysis(
         if isinstance(df.attrs.get("reward_params"), dict)
         else {}
     )
-    max_trade_duration_candles = _get_int_param(
-        reward_params,
-        "max_trade_duration_candles",
-        DEFAULT_MODEL_REWARD_PARAMETERS.get("max_trade_duration_candles", 128),
-    )
+    max_trade_duration_candles = _get_int_param(reward_params, "max_trade_duration_candles")
 
     # Helpers: consistent Markdown table renderers
     def _fmt_val(v: Any, ndigits: int = 6) -> str:
@@ -3809,11 +3693,7 @@ def write_complete_statistical_analysis(
             if isinstance(df.attrs.get("reward_params"), dict)
             else {}
         )
-        exit_mode = _get_str_param(
-            reward_params,
-            "exit_potential_mode",
-            DEFAULT_MODEL_REWARD_PARAMETERS.get("exit_potential_mode", "canonical"),
-        )
+        exit_mode = _get_str_param(reward_params, "exit_potential_mode")
         potential_gamma = _get_potential_gamma(reward_params)
         f.write(f"| exit_potential_mode | {exit_mode} |\n")
         f.write(f"| potential_gamma | {potential_gamma} |\n")