From: Jérôme Benoit Date: Mon, 6 Oct 2025 12:27:49 +0000 (+0200) Subject: refactor(reward): centralize piecewise divisor, improve idle fallback, record param... X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=17649d4da6a210369e1935b8288401bfbeba6751;p=freqai-strategies.git refactor(reward): centralize piecewise divisor, improve idle fallback, record param adjustment reasons --- diff --git a/ReforceXY/reward_space_analysis/reward_space_analysis.py b/ReforceXY/reward_space_analysis/reward_space_analysis.py index fa256e4..9c698fb 100644 --- a/ReforceXY/reward_space_analysis/reward_space_analysis.py +++ b/ReforceXY/reward_space_analysis/reward_space_analysis.py @@ -23,6 +23,7 @@ import dataclasses import math import pickle import random +import warnings from enum import Enum, IntEnum from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple @@ -86,6 +87,25 @@ def _get_param_float(params: Dict[str, float | str], key: str, default: float) - return default +def _piecewise_duration_divisor( + duration_ratio: float, params: Dict[str, float | str] +) -> float: + """Compute divisor for piecewise attenuation (single source of truth). + + Ensures consistent fallback behaviour across the primary code path and + exception fallback in ``_get_exit_factor`` without duplicating logic. + """ + exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0) + if not (0.0 <= exit_piecewise_grace <= 1.0): # sanitize grace range + exit_piecewise_grace = 1.0 + exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0) + if exit_piecewise_slope < 0.0: # sanitize slope sign + exit_piecewise_slope = 1.0 + if duration_ratio <= exit_piecewise_grace: + return 1.0 + return 1.0 + exit_piecewise_slope * (duration_ratio - exit_piecewise_grace) + + def _compute_duration_ratio(trade_duration: int, max_trade_duration: int) -> float: """Compute duration ratio with safe division.""" return trade_duration / max(1, max_trade_duration) @@ -109,6 +129,8 @@ DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = { # Idle penalty (env defaults) "idle_penalty_power": 1.0, "idle_penalty_scale": 1.0, + # If <=0 or unset, falls back to max_trade_duration_candles at runtime + "max_idle_duration_candles": 0, # Holding keys (env defaults) "holding_penalty_scale": 0.5, "holding_penalty_power": 1.0, @@ -125,6 +147,9 @@ DEFAULT_MODEL_REWARD_PARAMETERS: Dict[str, float | str] = { # Profit factor params (env defaults) "win_reward_factor": 2.0, "pnl_factor_beta": 0.5, + # Invariant / safety controls (env defaults) + "check_invariants": True, + "exit_factor_threshold": 10000.0, } DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { @@ -132,6 +157,7 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { "base_factor": "Base reward factor used inside the environment.", "idle_penalty_power": "Power applied to idle penalty scaling.", "idle_penalty_scale": "Scale of idle penalty.", + "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling; 0 = use max_trade_duration_candles.", "holding_penalty_scale": "Scale of holding penalty.", "holding_penalty_power": "Power applied to holding penalty scaling.", "exit_factor_mode": "Time attenuation mode for exit factor.", @@ -144,6 +170,8 @@ DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = { "efficiency_center": "Center for efficiency factor sigmoid.", "win_reward_factor": "Amplification for pnl above target.", "pnl_factor_beta": "Sensitivity of amplification around target.", + "check_invariants": "Boolean flag (true/false) to enable runtime invariant & safety checks.", + "exit_factor_threshold": "If |exit factor| exceeds this threshold, emit warning.", } @@ -157,6 +185,7 @@ _PARAMETER_BOUNDS: Dict[str, Dict[str, float]] = { "base_factor": {"min": 0.0}, "idle_penalty_power": {"min": 0.0}, "idle_penalty_scale": {"min": 0.0}, + "max_idle_duration_candles": {"min": 0.0}, "holding_penalty_scale": {"min": 0.0}, "holding_penalty_power": {"min": 0.0}, "exit_linear_slope": {"min": 0.0}, @@ -208,10 +237,8 @@ def validate_reward_parameters( adjustments[key] = { "original": original, "adjusted": adjusted, - "reason": float("nan"), # placeholder for JSON compatibility + "reason": ",".join(reason_parts), # textual reason directly } - # store textual reason separately to avoid type mixing - adjustments[key]["_reason_text"] = ",".join(reason_parts) return sanitized, adjustments @@ -278,20 +305,43 @@ def _get_exit_factor( duration_ratio: float, params: Dict[str, float | str], ) -> float: - """Compute the complete exit factor including time-based attenuation and PnL factor. - - This mirrors ReforceXY._get_exit_factor() exactly: - 1. Apply time-based attenuation to base factor - 2. Multiply by pnl_factor at the end - - Exit factor mode formulas (explicit clarification for auditability): - - legacy: factor *= 1.5 if duration_ratio <= 1 else *= 0.5 - - sqrt: factor /= sqrt(1 + duration_ratio) - - linear: factor /= (1 + slope * duration_ratio) - - power: alpha = -log(tau)/log(2) with tau in (0,1]; factor /= (1 + duration_ratio)^alpha - - piecewise: grace region g in [0,1]; factor /= 1 if d<=g else (1 + slope*(d-g)) - - half_life: factor *= 2^(-duration_ratio/half_life) + """Compute the complete exit factor (time attenuation + PnL scaling). + + Synchronization + --------------- + Mirrors the environment implementation `ReforceXY._get_exit_factor` (parity date: 2025-10-06). + Any upstream change MUST be ported here to keep analytical results consistent with live behavior. + + Processing steps + ---------------- + 1. Clamp negative ``duration_ratio`` to 0. + 2. Apply time-based attenuation according to ``exit_factor_mode``. + 3. Multiply by ``pnl_factor`` (already includes profit amplification & efficiency component). + 4. Enforce invariants (finite; non-negative when pnl>=0) and optionally emit a warning if + ``|factor| > exit_factor_threshold`` (warning-only, no capping). + + Modes (attenuation formulas) + ---------------------------- + legacy : factor *= 1.5 if r <= 1 else *= 0.5 (step change) + sqrt : factor /= sqrt(1 + r) + linear : factor /= (1 + slope * r) + power : alpha = -log(tau)/log(2) with tau in (0,1]; factor /= (1 + r)^alpha (fallback alpha=1) + piecewise : grace g in [0,1]; if r <= g then divisor=1 else divisor = 1 + slope * (r - g) + half_life : factor *= 2^(-r / half_life) + + Fallback: Unknown modes default to piecewise. + + Invariants + ---------- + - Factor set to 0 if non-finite. + - Factor clamped to >=0 when pnl >= 0. + - Threshold exceedance triggers RuntimeWarning only. + + Returns + ------- + float : attenuated & pnl-scaled factor (reward exit component = pnl * factor). """ + # Basic finiteness checks if ( not math.isfinite(factor) or not math.isfinite(pnl) @@ -299,57 +349,76 @@ def _get_exit_factor( ): return 0.0 - exit_factor_mode = str(params.get("exit_factor_mode", "piecewise")).lower() + # Guard: duration ratio should never be negative + if duration_ratio < 0.0: + duration_ratio = 0.0 - if exit_factor_mode == "legacy": - if duration_ratio <= 1.0: - factor *= 1.5 - else: - factor *= 0.5 - elif exit_factor_mode == "sqrt": - factor /= math.sqrt(1.0 + duration_ratio) - elif exit_factor_mode == "linear": - slope = _get_param_float(params, "exit_linear_slope", 1.0) - if slope < 0.0: - slope = 1.0 - factor /= 1.0 + slope * duration_ratio - elif exit_factor_mode == "power": - exit_power_alpha = params.get("exit_power_alpha") - if isinstance(exit_power_alpha, (int, float)) and exit_power_alpha < 0.0: - exit_power_alpha = None - if exit_power_alpha is None: - exit_power_tau = params.get("exit_power_tau") - if isinstance(exit_power_tau, (int, float)): - exit_power_tau = float(exit_power_tau) - if 0.0 < exit_power_tau <= 1.0: - exit_power_alpha = -math.log(exit_power_tau) / _LOG_2 - if not isinstance(exit_power_alpha, (int, float)): - exit_power_alpha = 1.0 - else: - exit_power_alpha = float(exit_power_alpha) - factor /= math.pow(1.0 + duration_ratio, exit_power_alpha) - elif exit_factor_mode == "piecewise": - exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0) - if not (0.0 <= exit_piecewise_grace <= 1.0): - exit_piecewise_grace = 1.0 - exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0) - if exit_piecewise_slope < 0.0: - exit_piecewise_slope = 1.0 - if duration_ratio <= exit_piecewise_grace: - duration_divisor = 1.0 - else: - duration_divisor = 1.0 + exit_piecewise_slope * ( - duration_ratio - exit_piecewise_grace - ) - factor /= duration_divisor - elif exit_factor_mode == "half_life": - exit_half_life = _get_param_float(params, "exit_half_life", 0.5) - if exit_half_life <= 0.0: - exit_half_life = 0.5 - factor *= math.pow(2.0, -duration_ratio / exit_half_life) + exit_factor_mode = str(params.get("exit_factor_mode", "piecewise")).lower() + try: + if exit_factor_mode == "legacy": + factor *= 1.5 if duration_ratio <= 1.0 else 0.5 + elif exit_factor_mode == "sqrt": + factor /= math.sqrt(1.0 + duration_ratio) + elif exit_factor_mode == "linear": + slope = _get_param_float(params, "exit_linear_slope", 1.0) + if slope < 0.0: + slope = 1.0 + factor /= 1.0 + slope * duration_ratio + elif exit_factor_mode == "power": + exit_power_alpha = params.get("exit_power_alpha") + if isinstance(exit_power_alpha, (int, float)) and exit_power_alpha < 0.0: + exit_power_alpha = None + if exit_power_alpha is None: + exit_power_tau = params.get("exit_power_tau") + if isinstance(exit_power_tau, (int, float)): + exit_power_tau = float(exit_power_tau) + if 0.0 < exit_power_tau <= 1.0: + exit_power_alpha = -math.log(exit_power_tau) / _LOG_2 + if not isinstance(exit_power_alpha, (int, float)): + exit_power_alpha = 1.0 + else: + exit_power_alpha = float(exit_power_alpha) + factor /= math.pow(1.0 + duration_ratio, exit_power_alpha) + elif exit_factor_mode == "piecewise" or exit_factor_mode not in { + "legacy", + "sqrt", + "linear", + "power", + "half_life", + }: + # Default & fallback behaviour consolidated + factor /= _piecewise_duration_divisor(duration_ratio, params) + elif exit_factor_mode == "half_life": + exit_half_life = _get_param_float(params, "exit_half_life", 0.5) + if exit_half_life <= 0.0: + exit_half_life = 0.5 + factor *= math.pow(2.0, -duration_ratio / exit_half_life) + except Exception: + # Safe fallback to piecewise logic if any unexpected error arises (centralized) + factor /= _piecewise_duration_divisor(duration_ratio, params) + + # Apply pnl_factor after time attenuation factor *= pnl_factor + # Invariant & safety checks + if _to_bool(params.get("check_invariants", True)): + if not math.isfinite(factor): + return 0.0 + if factor < 0.0 and pnl >= 0.0: + # Clamp: avoid negative amplification on non-negative pnl + factor = 0.0 + thr = params.get("exit_factor_threshold") + if isinstance(thr, (int, float)) and thr > 0 and math.isfinite(thr): + if abs(factor) > thr: + warnings.warn( + ( + f"_get_exit_factor |factor|={abs(factor):.2f} exceeds threshold {thr:.2f}" + ), + RuntimeWarning, + stacklevel=2, + ) + return factor @@ -418,12 +487,16 @@ def _idle_penalty( """Mirror the environment's idle penalty behaviour.""" idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 1.0) idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.0) - max_idle_duration = int( + max_idle_duration_cfg = int( params.get( - "max_idle_duration_candles", - max(1, context.max_trade_duration), + "max_idle_duration_candles", params.get("max_trade_duration_candles", 0) ) ) + # Fallback: align with documented intent -> use context.max_trade_duration when cfg <= 0 + if max_idle_duration_cfg <= 0: + max_idle_duration = context.max_trade_duration + else: + max_idle_duration = max_idle_duration_cfg idle_duration_ratio = context.idle_duration / max(1, max_idle_duration) return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power @@ -459,15 +532,22 @@ def _compute_exit_reward( context.trade_duration, context.max_trade_duration ) exit_factor = _get_exit_factor( - base_factor, - context.pnl, - pnl_factor, - duration_ratio, - params, + base_factor, context.pnl, pnl_factor, duration_ratio, params ) return context.pnl * exit_factor +def compute_exit_factor( + base_factor: float, + pnl: float, + pnl_factor: float, + duration_ratio: float, + params: Dict[str, float | str], +) -> float: + """Public wrapper to compute the time-attenuated + pnl-scaled exit factor.""" + return _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params) + + def calculate_reward( context: RewardContext, params: Dict[str, float | str],