import math
import pickle
import random
+import warnings
from enum import Enum, IntEnum
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
return default
+def _piecewise_duration_divisor(
+ duration_ratio: float, params: Dict[str, float | str]
+) -> float:
+ """Compute divisor for piecewise attenuation (single source of truth).
+
+ Ensures consistent fallback behaviour across the primary code path and
+ exception fallback in ``_get_exit_factor`` without duplicating logic.
+ """
+ exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0)
+ if not (0.0 <= exit_piecewise_grace <= 1.0): # sanitize grace range
+ exit_piecewise_grace = 1.0
+ exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0)
+ if exit_piecewise_slope < 0.0: # sanitize slope sign
+ exit_piecewise_slope = 1.0
+ if duration_ratio <= exit_piecewise_grace:
+ return 1.0
+ return 1.0 + exit_piecewise_slope * (duration_ratio - exit_piecewise_grace)
+
+
def _compute_duration_ratio(trade_duration: int, max_trade_duration: int) -> float:
"""Compute duration ratio with safe division."""
return trade_duration / max(1, max_trade_duration)
# Idle penalty (env defaults)
"idle_penalty_power": 1.0,
"idle_penalty_scale": 1.0,
+ # If <=0 or unset, falls back to max_trade_duration_candles at runtime
+ "max_idle_duration_candles": 0,
# Holding keys (env defaults)
"holding_penalty_scale": 0.5,
"holding_penalty_power": 1.0,
# Profit factor params (env defaults)
"win_reward_factor": 2.0,
"pnl_factor_beta": 0.5,
+ # Invariant / safety controls (env defaults)
+ "check_invariants": True,
+ "exit_factor_threshold": 10000.0,
}
DEFAULT_MODEL_REWARD_PARAMETERS_HELP: Dict[str, str] = {
"base_factor": "Base reward factor used inside the environment.",
"idle_penalty_power": "Power applied to idle penalty scaling.",
"idle_penalty_scale": "Scale of idle penalty.",
+ "max_idle_duration_candles": "Maximum idle duration candles before full idle penalty scaling; 0 = use max_trade_duration_candles.",
"holding_penalty_scale": "Scale of holding penalty.",
"holding_penalty_power": "Power applied to holding penalty scaling.",
"exit_factor_mode": "Time attenuation mode for exit factor.",
"efficiency_center": "Center for efficiency factor sigmoid.",
"win_reward_factor": "Amplification for pnl above target.",
"pnl_factor_beta": "Sensitivity of amplification around target.",
+ "check_invariants": "Boolean flag (true/false) to enable runtime invariant & safety checks.",
+ "exit_factor_threshold": "If |exit factor| exceeds this threshold, emit warning.",
}
"base_factor": {"min": 0.0},
"idle_penalty_power": {"min": 0.0},
"idle_penalty_scale": {"min": 0.0},
+ "max_idle_duration_candles": {"min": 0.0},
"holding_penalty_scale": {"min": 0.0},
"holding_penalty_power": {"min": 0.0},
"exit_linear_slope": {"min": 0.0},
adjustments[key] = {
"original": original,
"adjusted": adjusted,
- "reason": float("nan"), # placeholder for JSON compatibility
+ "reason": ",".join(reason_parts), # textual reason directly
}
- # store textual reason separately to avoid type mixing
- adjustments[key]["_reason_text"] = ",".join(reason_parts)
return sanitized, adjustments
duration_ratio: float,
params: Dict[str, float | str],
) -> float:
- """Compute the complete exit factor including time-based attenuation and PnL factor.
-
- This mirrors ReforceXY._get_exit_factor() exactly:
- 1. Apply time-based attenuation to base factor
- 2. Multiply by pnl_factor at the end
-
- Exit factor mode formulas (explicit clarification for auditability):
- - legacy: factor *= 1.5 if duration_ratio <= 1 else *= 0.5
- - sqrt: factor /= sqrt(1 + duration_ratio)
- - linear: factor /= (1 + slope * duration_ratio)
- - power: alpha = -log(tau)/log(2) with tau in (0,1]; factor /= (1 + duration_ratio)^alpha
- - piecewise: grace region g in [0,1]; factor /= 1 if d<=g else (1 + slope*(d-g))
- - half_life: factor *= 2^(-duration_ratio/half_life)
+ """Compute the complete exit factor (time attenuation + PnL scaling).
+
+ Synchronization
+ ---------------
+ Mirrors the environment implementation `ReforceXY._get_exit_factor` (parity date: 2025-10-06).
+ Any upstream change MUST be ported here to keep analytical results consistent with live behavior.
+
+ Processing steps
+ ----------------
+ 1. Clamp negative ``duration_ratio`` to 0.
+ 2. Apply time-based attenuation according to ``exit_factor_mode``.
+ 3. Multiply by ``pnl_factor`` (already includes profit amplification & efficiency component).
+ 4. Enforce invariants (finite; non-negative when pnl>=0) and optionally emit a warning if
+ ``|factor| > exit_factor_threshold`` (warning-only, no capping).
+
+ Modes (attenuation formulas)
+ ----------------------------
+ legacy : factor *= 1.5 if r <= 1 else *= 0.5 (step change)
+ sqrt : factor /= sqrt(1 + r)
+ linear : factor /= (1 + slope * r)
+ power : alpha = -log(tau)/log(2) with tau in (0,1]; factor /= (1 + r)^alpha (fallback alpha=1)
+ piecewise : grace g in [0,1]; if r <= g then divisor=1 else divisor = 1 + slope * (r - g)
+ half_life : factor *= 2^(-r / half_life)
+
+ Fallback: Unknown modes default to piecewise.
+
+ Invariants
+ ----------
+ - Factor set to 0 if non-finite.
+ - Factor clamped to >=0 when pnl >= 0.
+ - Threshold exceedance triggers RuntimeWarning only.
+
+ Returns
+ -------
+ float : attenuated & pnl-scaled factor (reward exit component = pnl * factor).
"""
+ # Basic finiteness checks
if (
not math.isfinite(factor)
or not math.isfinite(pnl)
):
return 0.0
- exit_factor_mode = str(params.get("exit_factor_mode", "piecewise")).lower()
+ # Guard: duration ratio should never be negative
+ if duration_ratio < 0.0:
+ duration_ratio = 0.0
- if exit_factor_mode == "legacy":
- if duration_ratio <= 1.0:
- factor *= 1.5
- else:
- factor *= 0.5
- elif exit_factor_mode == "sqrt":
- factor /= math.sqrt(1.0 + duration_ratio)
- elif exit_factor_mode == "linear":
- slope = _get_param_float(params, "exit_linear_slope", 1.0)
- if slope < 0.0:
- slope = 1.0
- factor /= 1.0 + slope * duration_ratio
- elif exit_factor_mode == "power":
- exit_power_alpha = params.get("exit_power_alpha")
- if isinstance(exit_power_alpha, (int, float)) and exit_power_alpha < 0.0:
- exit_power_alpha = None
- if exit_power_alpha is None:
- exit_power_tau = params.get("exit_power_tau")
- if isinstance(exit_power_tau, (int, float)):
- exit_power_tau = float(exit_power_tau)
- if 0.0 < exit_power_tau <= 1.0:
- exit_power_alpha = -math.log(exit_power_tau) / _LOG_2
- if not isinstance(exit_power_alpha, (int, float)):
- exit_power_alpha = 1.0
- else:
- exit_power_alpha = float(exit_power_alpha)
- factor /= math.pow(1.0 + duration_ratio, exit_power_alpha)
- elif exit_factor_mode == "piecewise":
- exit_piecewise_grace = _get_param_float(params, "exit_piecewise_grace", 1.0)
- if not (0.0 <= exit_piecewise_grace <= 1.0):
- exit_piecewise_grace = 1.0
- exit_piecewise_slope = _get_param_float(params, "exit_piecewise_slope", 1.0)
- if exit_piecewise_slope < 0.0:
- exit_piecewise_slope = 1.0
- if duration_ratio <= exit_piecewise_grace:
- duration_divisor = 1.0
- else:
- duration_divisor = 1.0 + exit_piecewise_slope * (
- duration_ratio - exit_piecewise_grace
- )
- factor /= duration_divisor
- elif exit_factor_mode == "half_life":
- exit_half_life = _get_param_float(params, "exit_half_life", 0.5)
- if exit_half_life <= 0.0:
- exit_half_life = 0.5
- factor *= math.pow(2.0, -duration_ratio / exit_half_life)
+ exit_factor_mode = str(params.get("exit_factor_mode", "piecewise")).lower()
+ try:
+ if exit_factor_mode == "legacy":
+ factor *= 1.5 if duration_ratio <= 1.0 else 0.5
+ elif exit_factor_mode == "sqrt":
+ factor /= math.sqrt(1.0 + duration_ratio)
+ elif exit_factor_mode == "linear":
+ slope = _get_param_float(params, "exit_linear_slope", 1.0)
+ if slope < 0.0:
+ slope = 1.0
+ factor /= 1.0 + slope * duration_ratio
+ elif exit_factor_mode == "power":
+ exit_power_alpha = params.get("exit_power_alpha")
+ if isinstance(exit_power_alpha, (int, float)) and exit_power_alpha < 0.0:
+ exit_power_alpha = None
+ if exit_power_alpha is None:
+ exit_power_tau = params.get("exit_power_tau")
+ if isinstance(exit_power_tau, (int, float)):
+ exit_power_tau = float(exit_power_tau)
+ if 0.0 < exit_power_tau <= 1.0:
+ exit_power_alpha = -math.log(exit_power_tau) / _LOG_2
+ if not isinstance(exit_power_alpha, (int, float)):
+ exit_power_alpha = 1.0
+ else:
+ exit_power_alpha = float(exit_power_alpha)
+ factor /= math.pow(1.0 + duration_ratio, exit_power_alpha)
+ elif exit_factor_mode == "piecewise" or exit_factor_mode not in {
+ "legacy",
+ "sqrt",
+ "linear",
+ "power",
+ "half_life",
+ }:
+ # Default & fallback behaviour consolidated
+ factor /= _piecewise_duration_divisor(duration_ratio, params)
+ elif exit_factor_mode == "half_life":
+ exit_half_life = _get_param_float(params, "exit_half_life", 0.5)
+ if exit_half_life <= 0.0:
+ exit_half_life = 0.5
+ factor *= math.pow(2.0, -duration_ratio / exit_half_life)
+ except Exception:
+ # Safe fallback to piecewise logic if any unexpected error arises (centralized)
+ factor /= _piecewise_duration_divisor(duration_ratio, params)
+
+ # Apply pnl_factor after time attenuation
factor *= pnl_factor
+ # Invariant & safety checks
+ if _to_bool(params.get("check_invariants", True)):
+ if not math.isfinite(factor):
+ return 0.0
+ if factor < 0.0 and pnl >= 0.0:
+ # Clamp: avoid negative amplification on non-negative pnl
+ factor = 0.0
+ thr = params.get("exit_factor_threshold")
+ if isinstance(thr, (int, float)) and thr > 0 and math.isfinite(thr):
+ if abs(factor) > thr:
+ warnings.warn(
+ (
+ f"_get_exit_factor |factor|={abs(factor):.2f} exceeds threshold {thr:.2f}"
+ ),
+ RuntimeWarning,
+ stacklevel=2,
+ )
+
return factor
"""Mirror the environment's idle penalty behaviour."""
idle_penalty_scale = _get_param_float(params, "idle_penalty_scale", 1.0)
idle_penalty_power = _get_param_float(params, "idle_penalty_power", 1.0)
- max_idle_duration = int(
+ max_idle_duration_cfg = int(
params.get(
- "max_idle_duration_candles",
- max(1, context.max_trade_duration),
+ "max_idle_duration_candles", params.get("max_trade_duration_candles", 0)
)
)
+ # Fallback: align with documented intent -> use context.max_trade_duration when cfg <= 0
+ if max_idle_duration_cfg <= 0:
+ max_idle_duration = context.max_trade_duration
+ else:
+ max_idle_duration = max_idle_duration_cfg
idle_duration_ratio = context.idle_duration / max(1, max_idle_duration)
return -idle_factor * idle_penalty_scale * idle_duration_ratio**idle_penalty_power
context.trade_duration, context.max_trade_duration
)
exit_factor = _get_exit_factor(
- base_factor,
- context.pnl,
- pnl_factor,
- duration_ratio,
- params,
+ base_factor, context.pnl, pnl_factor, duration_ratio, params
)
return context.pnl * exit_factor
+def compute_exit_factor(
+ base_factor: float,
+ pnl: float,
+ pnl_factor: float,
+ duration_ratio: float,
+ params: Dict[str, float | str],
+) -> float:
+ """Public wrapper to compute the time-attenuated + pnl-scaled exit factor."""
+ return _get_exit_factor(base_factor, pnl, pnl_factor, duration_ratio, params)
+
+
def calculate_reward(
context: RewardContext,
params: Dict[str, float | str],